Choerodon:分布式webSocket

什么是socket

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议

image

我们知道网络中的进程是通过socket来通信的,那什么是socket呢?socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,都可以用“打开open –> 读写write/read –> 关闭close”模式来操作。Socket(套接字)用来描述IP地址和端口,是通信链的句柄,应用程序可以通过Socket向网络发送请求或者应答网络请求。Socket是支持TCP/IP协议的网络通信的基础操作单元,是对网络通信过程中端点的抽象表示,包含了进行网络通信所必须的五种信息:

连接所使用的协议;
本地主机的IP地址;
本地远程的协议端口;
远地主机的IP地址;
远地进程的协议端口。

一些socket函数就是对其进行的操作(读/写IO、打开、关闭)

websocket概念

WebSocket是一种在单个TCP连接上进行全双工通信的协议,在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输

实现思路

我们知道,前端与服务端建立websocket后,后端需要向对应的websocket推送数据,而在分布式环境中,服务端实例可能存在多个,因为微服务负载均衡导致不同实例会有各自的websocket连接,因而实例A无法推送数据到实例B中的websocket连接。这里利用redis的消息订阅做websocket的转发,每个建立的ws与key绑定,但推送数据时会根据key到redis转发到相应实例进行ws推送

image

关键代码

基于org.springframeworkspring-websocket开发

1
2
3
4
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
</dependency>

用redis做转发

项目启动后BrokerHelper.java会执行定时任务,会到redis中缓存该实例的信息,并做心跳的判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@PostConstruct
public void start() {
this.registerKey = REGISTER_PREFIX + application;
registerByBrokerName();
//用定时任务与redis的维持心跳,更新最后时间
scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
String thisInstanceRegisterKey = REGISTER_PREFIX +brokerName();
//在redis中缓存set集合,key为服务名,value为实例名
redisTemplate.opsForSet().add(registerKey, brokerName();
//在redis中缓存value,key为实例名,value为最后心跳时间
redisTemplate.opsForValue().set(thisInstanceRegisterKey Long.toString(System.currentTimeMillis()));
//判断redis中该服务其他缓存的实例,如果超过心跳时间,说明实例已经停止,则移除redis中该实例的记录
Optional.ofNullable(redisTemplate.opsForSet().member(registerKey)).orElse(Collections.emptySet()).forEach(t-> {
if (t.equals(brokerName())) {
return;
}
String instanceRegisterKey = REGISTER_PREFIX + t;
long lastUpdateTime = Long.parseLon(Optional.ofNullable(redisTemplate.opsForValue().ge(instanceRegisterKey)).orElse("0"));
if (System.currentTimeMillis() - lastUpdateTime > 2* heartBeatIntervalMs) {
removeDeathBroker(t);
}
});
} catch (Exception e) {
LOGGER.error("error.redisRegister.heartBeat", e);
}
}, heartBeatIntervalMs, heartBeatIntervalMs,TimeUnit.MILLISECONDS);
}

缓存WebSocketSession及与key的关系

DefaultRelationshipDefining.java缓存webSocketSession与key的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Map<String, Set<WebSocketSession>> keySessionsMap = new ConcurrentHashMap<>();
private Map<WebSocketSession, Set<String>> sessionKeysMap = new ConcurrentHashMap<>();

@Override
public void contact(String key, WebSocketSession session) {
if (StringUtils.isEmpty(key)) {
return;
}
if (session != null) {
//本地缓存与key关联的session列表
Set<WebSocketSession> sessions =keySessionsMap.computeIfAbsent(key, k -> new HashSet<>());
sessions.add(session);
//本地缓存与session关联的key列表
Set<String> subKeys = sessionKeysMap.computeIfAbsen(session, k -> new HashSet<>());
subKeys.add(key);
LOGGER.debug("webSocket subscribe sessionId is {}, subKeysis {}", session.getId(), subKeys);
}
//在redis中缓存set集合,key为实例名,value为key
redisTemplate.opsForSet().add(brokerHelper.brokerName(), key);
}

消息推送与redis订阅

DefaultSmartMessageSender.javasendByKey()推送消息给webSocketSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public void sendByKey(String key, WebSocketSendPayload<?> payload) {
if (!StringUtils.isEmpty(key) && payload != null) {
//根据key推送消息给当前实例所关联的webSocketSession
relationshipDefining.getWebSocketSessionsByKey(key).forEac(session -> this.sendWebSocket(session, payload));
//根据key到redis中获取关心该key的实例列表,然后发送redis订阅,给相应的topic
relationshipDefining.getRedisChannelsByKey(key, true).forEac(redis -> this.sendRedis(redis, payload));
}
}

/**
* 到redis中获取关心该key的实例列表
**/
@Override
public Set<String> getRedisChannelsByKey(String key, booleanexceptSelf) {
Set<String> set = new HashSet<>();
//到redis中获取所有实例名称
Set<String> survivalChannels = redisTemplate.opsForSet().members(registerKey);
if (exceptSelf) {
survivalChannels.remove(brokerHelper.brokerName());
}
survivalChannels.forEach(t -> {
//到redis中根据实例名称获取该实例所关心的key,若有包含当前key则返回该实例
if (redisTemplate.opsForSet().members(t).contains(key)) {
set.add(t);
}
});
return set;
}
/**
* 发送redis订阅,给相应的topic
**/
@Override
public void sendRedis(String channel, WebSocketSendPayload<?> payload){
if (payload == null) {
LOGGER.warn("error.messageOperator.sendRedis.payloadIsNull");
return;
}
try {
//发送数据到某个topic
redisTemplate.convertAndSend(channel,objectMapper.writeValueAsString(payload));
} catch (JsonProcessingException e) {
LOGGER.war("error.messageOperator.sendRedisDefaultChannel.JsonProcessingxception, payload: {}", payload, e);
}
}

订阅redis‘Topic

ChoerodonWebSocketConfigure.java(实现WebSocketConfigurer接口)中监听redis订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
MessageListenerAdapter defaultListenerAdapte(ReceiveRedisMessageListener receiveRedisMessageListener) {
return new MessageListenerAdapter(receiveRedisMessageListener,"receiveMessage");
}
/**
* 项目启动后,用实例名称做topic,在redis做消息订阅
**/
@Bean
RedisMessageListenerContainer defaultContainer(RedisConnectionFactoryconnectionFactory,
MessageListenerAdaptermessageListenerAdapter,
BrokerHelperbrokerHelper) {
PatternTopic topic = new PatternTopic(brokerHelper.brokerName());
RedisMessageListenerContainer container = newRedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, topic);
LOGGER.info("Begin listen redis channel: {}", topic);
return container;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 监听本实例的channel,接收消息
*/
@Service
public class ReceiveRedisMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveRedisMessageListener.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private MessageSender messageSender;

public ReceiveRedisMessageListener(MessageSender messageSender) {
this.messageSender = messageSender;
}

public void receiveMessage(Object message) {
//监听到redis订阅,推送消息到当前实例上连接的webSocketSession
LOGGER.debug("receive message from redis channels, message {}", message);
if (message instanceof String) {
try {
JsonNode node = OBJECT_MAPPER.readTree((String) message);
String key = node.get("key").asText();
if (!StringUtils.isEmpty(key)) {
messageSender.sendWebSocketByKey(key, (String) message);
}
} catch (IOException e) {
LOGGER.warn("error.receiveRedisMessageListener.receiveMessage.send", e);
}
} else {
LOGGER.warn("receive message from redis channels that type is not String, message: {}", message);
}
}
}

参考文献

  • choerodon-starter-websocket