Springboot 整合 Socket 实战案例 ,实现 单点发送、广播群发,1对1,1对多

x33g5p2x  于2022-08-17 转载在 Spring  
字(12.5k)|赞(0)|评价(0)|浏览(625)

本篇内容:

后端 + 前端简单HTML页面

功能场景点:

1.  群发,所有人都能收到

2.  局部群发,部分人群都能收到

3.  单点推送, 指定某个人的页面

惯例,先看看本次实战示例项目结构:

可以看到内容不多,也就是说,springboot 整合socket, 跟着我学,轻轻松松。

古有曹植七步成诗,如今,咱们也是 7步学会整合socket!

不多说,开始:

 ① pom引入核心依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>fastjson</artifactId>
  5. <version>1.2.75</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.corundumstudio.socketio</groupId>
  9. <artifactId>netty-socketio</artifactId>
  10. <version>1.7.7</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-web</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-test</artifactId>
  19. <scope>test</scope>
  20. </dependency>
  21. </dependencies>

 ② yml加上配置项

  1. server:
  2. port: 8089
  3. socketio:
  4. host: localhost
  5. port: 8503
  6. maxFramePayloadLength: 1048576
  7. maxHttpContentLength: 1048576
  8. bossCount: 1
  9. workCount: 100
  10. allowCustomRequests: true
  11. upgradeTimeout: 10000
  12. pingTimeout: 60000
  13. pingInterval: 25000

③ 创建socket配置加载类 MySocketConfig.java

  1. import com.corundumstudio.socketio.SocketConfig;
  2. import com.corundumstudio.socketio.SocketIOServer;
  3. import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @Author: JCccc
  9. * @Description:
  10. * @Date: 2022/06/13 21:50
  11. */
  12. @Configuration
  13. public class MySocketConfig{
  14. @Value("${socketio.host}")
  15. private String host;
  16. @Value("${socketio.port}")
  17. private Integer port;
  18. @Value("${socketio.bossCount}")
  19. private int bossCount;
  20. @Value("${socketio.workCount}")
  21. private int workCount;
  22. @Value("${socketio.allowCustomRequests}")
  23. private boolean allowCustomRequests;
  24. @Value("${socketio.upgradeTimeout}")
  25. private int upgradeTimeout;
  26. @Value("${socketio.pingTimeout}")
  27. private int pingTimeout;
  28. @Value("${socketio.pingInterval}")
  29. private int pingInterval;
  30. @Bean
  31. public SocketIOServer socketIOServer() {
  32. SocketConfig socketConfig = new SocketConfig();
  33. socketConfig.setTcpNoDelay(true);
  34. socketConfig.setSoLinger(0);
  35. com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
  36. buildSocketConfig(socketConfig, config);
  37. return new SocketIOServer(config);
  38. }
  39. /**
  40. * 扫描netty-socketIo的注解( @OnConnect、@OnEvent等)
  41. */
  42. @Bean
  43. public SpringAnnotationScanner springAnnotationScanner() {
  44. return new SpringAnnotationScanner(socketIOServer());
  45. }
  46. private void buildSocketConfig(SocketConfig socketConfig, com.corundumstudio.socketio.Configuration config) {
  47. config.setSocketConfig(socketConfig);
  48. config.setHostname(host);
  49. config.setPort(port);
  50. config.setBossThreads(bossCount);
  51. config.setWorkerThreads(workCount);
  52. config.setAllowCustomRequests(allowCustomRequests);
  53. config.setUpgradeTimeout(upgradeTimeout);
  54. config.setPingTimeout(pingTimeout);
  55. config.setPingInterval(pingInterval);
  56. }
  57. }

④创建消息实体 MyMessage.java

  1. /**
  2. * @Author: JCccc
  3. * @Date: 2022-07-23 9:05
  4. * @Description:
  5. */
  6. public class MyMessage {
  7. private String type;
  8. private String content;
  9. private String from;
  10. private String to;
  11. private String channel;
  12. public String getType() {
  13. return type;
  14. }
  15. public void setType(String type) {
  16. this.type = type;
  17. }
  18. public String getContent() {
  19. return content;
  20. }
  21. public void setContent(String content) {
  22. this.content = content;
  23. }
  24. public String getFrom() {
  25. return from;
  26. }
  27. public void setFrom(String from) {
  28. this.from = from;
  29. }
  30. public String getTo() {
  31. return to;
  32. }
  33. public void setTo(String to) {
  34. this.to = to;
  35. }
  36. public String getChannel() {
  37. return channel;
  38. }
  39. public void setChannel(String channel) {
  40. this.channel = channel;
  41. }
  42. }

代码简析:

 

⑤创建 socket handler 负责记录客户端 连接、下线

MySocketHandler.java

  1. import com.corundumstudio.socketio.SocketIOClient;
  2. import com.corundumstudio.socketio.SocketIOServer;
  3. import com.corundumstudio.socketio.annotation.OnConnect;
  4. import com.corundumstudio.socketio.annotation.OnDisconnect;
  5. import com.socket.mysocket.util.SocketUtil;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import javax.annotation.PostConstruct;
  8. import javax.annotation.PreDestroy;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.stereotype.Component;
  12. /**
  13. * @Author: JCccc
  14. * @Description:
  15. * @Date: 2022/6/23 21:21
  16. */
  17. @Component
  18. public class MySocketHandler {
  19. private final Logger log = LoggerFactory.getLogger(this.getClass());
  20. @Autowired
  21. private SocketIOServer socketIoServer;
  22. @PostConstruct
  23. private void start(){
  24. try {
  25. socketIoServer.start();
  26. }catch (Exception e){
  27. e.printStackTrace();
  28. }
  29. }
  30. @PreDestroy
  31. private void destroy(){
  32. try {
  33. socketIoServer.stop();
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }
  37. }
  38. @OnConnect
  39. public void connect(SocketIOClient client) {
  40. String userFlag = client.getHandshakeData().getSingleUrlParam("userFlag");
  41. SocketUtil.connectMap.put(userFlag, client);
  42. log.info("客户端userFlag: "+ userFlag+ "已连接");
  43. }
  44. @OnDisconnect
  45. public void onDisconnect(SocketIOClient client) {
  46. String userFlag = client.getHandshakeData().getSingleUrlParam("userFlag");
  47. log.info("客户端userFlag:" + userFlag + "断开连接");
  48. SocketUtil.connectMap.remove(userFlag, client);
  49. }
  50. }

代码简析:

 ⑥ 封装的socket 小函数

SocketUtil.java

  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.corundumstudio.socketio.AckRequest;
  4. import com.corundumstudio.socketio.SocketIOClient;
  5. import com.corundumstudio.socketio.annotation.OnEvent;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.StringUtils;
  11. import java.util.Map;
  12. import java.util.Objects;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.ConcurrentMap;
  15. /**
  16. * @Author: JCccc
  17. * @Description:
  18. * @Date: 2022/6/23 21:28
  19. */
  20. @Component
  21. public class SocketUtil {
  22. private final Logger log = LoggerFactory.getLogger(this.getClass());
  23. //暂且把用户&客户端信息存在缓存
  24. public static ConcurrentMap<String, SocketIOClient> connectMap = new ConcurrentHashMap<>();
  25. @OnEvent(value = "CHANNEL_SYSTEM")
  26. public void systemDataListener(String receiveMsg) {
  27. if (!StringUtils.hasLength(receiveMsg)){
  28. return;
  29. }
  30. JSONObject msgObject = (JSONObject) JSON.parse(receiveMsg);
  31. String userFlag = String.valueOf(msgObject.get("from"));
  32. String content = String.valueOf(msgObject.get("content"));
  33. log.info("收到用户 : {} 推送到系统频道的一条消息 :{}",userFlag,content );
  34. }
  35. public void sendToAll(Map<String, Object> msg,String sendChannel) {
  36. if (connectMap.isEmpty()){
  37. return;
  38. }
  39. //给在这个频道的每个客户端发消息
  40. for (Map.Entry<String, SocketIOClient> entry : connectMap.entrySet()) {
  41. entry.getValue().sendEvent(sendChannel, msg);
  42. }
  43. }
  44. public void sendToOne(String userFlag, Map<String, Object> msg,String sendChannel) {
  45. //拿出某个客户端信息
  46. SocketIOClient socketClient = getSocketClient(userFlag);
  47. if (Objects.nonNull(socketClient) ){
  48. //单独给他发消息
  49. socketClient.sendEvent(sendChannel,msg);
  50. }
  51. }
  52. /**
  53. * 识别出客户端
  54. * @param userFlag
  55. * @return
  56. */
  57. public SocketIOClient getSocketClient(String userFlag){
  58. SocketIOClient client = null;
  59. if (StringUtils.hasLength(userFlag) && !connectMap.isEmpty()){
  60. for (String key : connectMap.keySet()) {
  61. if (userFlag.equals(key)){
  62. client = connectMap.get(key);
  63. }
  64. }
  65. }
  66. return client;
  67. }
  68. }

代码简析:

⑦写1个接口,模拟场景,前端页面调用后端接口,做消息推送

TestController.java

  1. import com.socket.mysocket.dto.MyMessage;
  2. import com.socket.mysocket.util.SocketUtil;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * @Author: JCccc
  9. * @Description:
  10. * @Date: 2022/06/13 21:50
  11. */
  12. @RestController
  13. public class TestController {
  14. public final static String SEND_TYPE_ALL = "ALL";
  15. public final static String SEND_TYPE_ALONE = "ALONE";
  16. @Autowired
  17. SocketUtil socketUtil;
  18. @PostMapping("/testSendMsg")
  19. public String testSendMsg(@RequestBody MyMessage myMessage){
  20. Map<String, Object> map = new HashMap<>();
  21. map.put("msg",myMessage.getContent());
  22. //群发
  23. if (SEND_TYPE_ALL.equals(myMessage.getType())){
  24. socketUtil.sendToAll( map,myMessage.getChannel());
  25. return "success";
  26. }
  27. //指定单人
  28. if (SEND_TYPE_ALONE.equals(myMessage.getType())){
  29. socketUtil.sendToOne(myMessage.getTo(), map, myMessage.getChannel());
  30. return "success";
  31. }
  32. return "fail";
  33. }
  34. }

代码简析:

好了,7步了。一切已经就绪了。

前端简单页面

接下来搞点前端HTML页面, 玩一玩看看效果:

第一个页面:
TestClientStudentJC.html

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
  5. <title>我要连SOCKET</title>
  6. <base>
  7. <script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.min.js"></script>
  8. <script src="https://cdn.bootcss.com/socket.io/2.2.0/socket.io.js"></script>
  9. <style>
  10. body {
  11. padding: 20px;
  12. }
  13. #console {
  14. height: 450px;
  15. overflow: auto;
  16. }
  17. .msg-color {
  18. color: green;
  19. }
  20. </style>
  21. </head>
  22. <body>
  23. <div id="console" class="well"></div>
  24. <div id="conversationDiv">
  25. <labal>给系统推消息</labal>
  26. <input type="text" id="content"/>
  27. <button id="btnSendToSystem" onclick="sendSys();">发送</button>
  28. </div>
  29. </body>
  30. <script type="text/javascript">
  31. var socket;
  32. connect();
  33. function connect() {
  34. var userFlag = 'user_JC';
  35. var opts = {
  36. query: 'userFlag=' + userFlag
  37. };
  38. socket = io.connect('http://localhost:8503', opts);
  39. socket.on('connect', function () {
  40. console.log("连接成功");
  41. output('当前用户是:' + userFlag );
  42. output('<span class="msg-color">连接成功了。</span>');
  43. });
  44. socket.on('disconnect', function () {
  45. output('<span class="msg-color">下线了。 </span>');
  46. });
  47. socket.on('CHANNEL_STUDENT', function (data) {
  48. let msg= JSON.stringify(data)
  49. output('收到学生频道消息了:' + msg );
  50. console.log(data);
  51. });
  52. socket.on('CHANNEL_SYSTEM', function (data) {
  53. let msg= JSON.stringify(data)
  54. output('收到系统全局消息了:' + msg );
  55. console.log(data);
  56. });
  57. }
  58. function sendSys() {
  59. console.log('发送消息给服务端');
  60. var content = document.getElementById('content').value;
  61. socket.emit('CHANNEL_SYSTEM',JSON.stringify({
  62. 'content': content,
  63. 'from': 'user_JC'
  64. }));
  65. }
  66. function output(message) {
  67. var element = $("<div>" + message + "</div>");
  68. $('#console').prepend(element);
  69. }
  70. </script>
  71. </html>

代码简析:

第二个页面,跟第一个基本一样,改一下用户唯一标识:

TestClientStudentPU.html

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
  5. <title>我要连SOCKET</title>
  6. <base>
  7. <script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.min.js"></script>
  8. <script src="https://cdn.bootcss.com/socket.io/2.2.0/socket.io.js"></script>
  9. <style>
  10. body {
  11. padding: 20px;
  12. }
  13. #console {
  14. height: 450px;
  15. overflow: auto;
  16. }
  17. .msg-color {
  18. color: green;
  19. }
  20. </style>
  21. </head>
  22. <body>
  23. <div id="console" class="well"></div>
  24. <div id="conversationDiv">
  25. <labal>给系统推消息</labal>
  26. <input type="text" id="content"/>
  27. <button id="btnSendToSystem" onclick="sendSys();">发送</button>
  28. </div>
  29. </body>
  30. <script type="text/javascript">
  31. var socket;
  32. connect();
  33. function connect() {
  34. var userFlag = 'user_PU';
  35. var opts = {
  36. query: 'userFlag=' + userFlag
  37. };
  38. socket = io.connect('http://localhost:8503', opts);
  39. socket.on('connect', function () {
  40. console.log("连接成功");
  41. output('当前用户是:' + userFlag );
  42. output('<span class="msg-color">连接成功了。</span>');
  43. });
  44. socket.on('disconnect', function () {
  45. output('<span class="msg-color">下线了。 </span>');
  46. });
  47. socket.on('CHANNEL_STUDENT', function (data) {
  48. let msg= JSON.stringify(data)
  49. output('收到学生频道消息了:' + msg );
  50. console.log(data);
  51. });
  52. socket.on('CHANNEL_SYSTEM', function (data) {
  53. let msg= JSON.stringify(data)
  54. output('收到系统全局消息了:' + msg );
  55. console.log(data);
  56. });
  57. }
  58. function sendSys() {
  59. console.log('发送消息给服务端');
  60. var content = document.getElementById('content').value;
  61. socket.emit('CHANNEL_SYSTEM',JSON.stringify({
  62. 'content': content,
  63. 'from': 'user_PU'
  64. }));
  65. }
  66. function output(message) {
  67. var element = $("<div>" + message + "</div>");
  68. $('#console').prepend(element);
  69. }
  70. </script>
  71. </html>

OK,把项目跑起来,开始玩。

直接访问客户端页面 模拟学生 JC连接socket:
http://127.0.0.1:8089/TestClientStudentJC.html

可以看到服务端有监测到:

这里监测的:

先试试客户端给系统推消息先:

可以看到服务端成功收到消息:
 

这种方式,其实是因为服务监听了相关的频道:

前端使用JS推到这个系统频道:

ps: 其实前端给服务端推消息,其实调用接口就可以。

OK,进入核心应用场景1:

 群发,所有人都能收到

 
 系统给连上的客户端都推送消息

{

"type": "ALL",

"content":"你们好,这是一条广播消息,全部人都能收到",

"channel":"CHANNEL_SYSTEM"

}

看看效果:

**  

 然后是场景2,局部群发,部分人群都能收到**

其实也就是通过HTML 客户端监听主题做区分就好:

直接拉人口,升3 :

模拟2个学生,1个老师都连接上了socket

当然,老师监听的是 老师频道:

然后我们模拟推送一下消息到指定的老师频道: 

{

"type": "ALL",

"content":"给老师们推一条消息!!!",

"channel":"CHANNEL_TEACHER"

}

最后一个场景,也就是单点推送,指定某个人收到

模拟 学生 PU 给 学生JC 推消息:

可以看到在学生频道的JC正常收到了PU的消息:

好了,该篇就到这吧。

相关文章