STOMP 数据实时通信

1. 前言

在现在Web应用中,实时通信越来越重要。传统的HTTP请求-响应模型已无法满足某些实时性要求较高的应用场景,比如在线聊天、消息推送、实时数据更新等。为了解决这个问题,可以利用WebSocket技术,而STOMP协议则可以进一步简化WebSocket的使用。

2. STOMP协议

STOMP(Streaming Text Oriented Messaging Protocol)是一种简单的文本消息协议,类似于HTTP,它定义了如何通过网络协议进行消息的发送和接收。 在WebSocket的基础上使用STOMP协议可以极大的简化消息的处理,使得消息系统更加结构化、可扩展且易于管理。

3. STOMP协议的优势

  • 简单易用: STOMP使用文本格式,非常容易解析和调试

  • 消息订阅/发布: 通过SubscribeUnSubscribe 命令可以轻松实现消息的订阅和取消订阅

  • 兼容性好: STOMP支持与现有消息代理系统(如activeMQ 、RabbitMQ)配合使用

  • 跨语言支持: STOMP不依赖于特定语言,允许不同应用的客户端通过STOMP通信

4. 工程案例

使用SpringBoot和STOMP实现一个简单的实时通信应用,包括消息的发送、异步处理和发布/订阅机制。前端页面使用JqueryLayui,通过HTTP方式直接引用

  1. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    <?xml version="1.0" encoding="UTF-8"?>
    <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.4.RELEASE</version>
    </parent>
    <groupId>com.yugb</groupId>
    <artifactId>stomp</artifactId>
    <version>1.0</version>
    <name>stomp</name>
    <packaging>jar</packaging>
    <url>http://www.example.com</url>
    <properties>
    <thymeleaf.version>3.0.9.RELEASE</thymeleaf.version>
    <maven.compiler.target>1.8</maven.compiler.target>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <tomcat.encoding>UTF-8</tomcat.encoding>
    <tomcat.version>8.5.45</tomcat.version>
    <thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version>
    <jdk.version>1.8</jdk.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
    <dependency>
    <groupId>net.sourceforge.nekohtml</groupId>
    <artifactId>nekohtml</artifactId>
    <version>1.9.22</version>
    </dependency>
    <dependency>
    <groupId>org.thymeleaf</groupId>
    <artifactId>thymeleaf-spring5</artifactId>
    </dependency>
    <dependency>
    <groupId>nz.net.ultraq.thymeleaf</groupId>
    <artifactId>thymeleaf-layout-dialect</artifactId>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
    <optional>true</optional>
    </dependency>
    </dependencies>
    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>
    </project>
  2. 配置application.yaml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    server:
    port: 8888
    tomcat:
    uri-encoding: utf-8
    servlet:
    context-path: /

    #自定义属性
    app:
    websocketEndpoint: /websocket-endpoint
    simpleBrokerDestinations:
    - /topic
    - /queue
    applicationDestinationPrefix: /app

    spring:
    mvc:
    view:
    prefix: /
    suffix: .html
    application:
    name: xiaoyuge
    #设置thymeleaf
    thymeleaf:
    prefix: classpath:/templates/
    mode: LEGACYHTML5
    encoding: UTF-8
    servlet:
    content-type: text/html; charset=utf-8

    jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
  3. 定义配置类

    将WebSocket相关配置放到属性类AppProperties,使用软绑定的方式注入属性值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package com.ygb.business.config;

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;

    /**
    * @author xiaoyuge
    */
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "app")
    public class AppProperties {

    private String websocketEndpoint;

    private String[] simpleBrokerDestinations;

    private String applicationDestinationPrefix;
    }
  4. WebSocket 配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.ygb.business.config;

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

    /**
    * @author xiaoyuge
    */
    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private AppProperties appProperties;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableSimpleBroker(appProperties.getSimpleBrokerDestinations());
    registry.setApplicationDestinationPrefixes(appProperties.getApplicationDestinationPrefix());
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint(appProperties.getWebsocketEndpoint()).withSockJS();
    }
    }
  5. 消息Message实体类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    package com.ygb.business.entity;

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;

    /**
    * @author xiaoyuge
    */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Message {

    private String content;
    }
  6. 消息处理控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    package com.ygb.business.controller;

    import com.ygb.business.entity.Message;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Controller;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureTask;
    import org.springframework.web.bind.annotation.GetMapping;

    /**
    * @author xiaoyuge
    */
    @Controller
    public class MessageController {

    /**
    * 主页
    */
    @GetMapping(value = {"/", "", "/index"})
    public String index() {
    return "index" ;
    }

    /**
    * 简单的消息发送,接收JSON对象并返回
    */
    @MessageMapping("/send")
    @SendTo("/topic/messages")
    public Message send(Message message) {
    //原样返回消息
    return message;
    }

    /**
    * @return 异步处理消息
    */
    @MessageMapping("asyncMessage")
    @SendTo("/topic/asyncMessages")
    public ListenableFuture<Message> asyncMessage(Message message) {
    ListenableFutureTask<Message> future = new ListenableFutureTask<Message>(() -> {
    //这里处理消息,省略。。。。。
    Thread.sleep(2000);
    return message;
    });
    new Thread(future).start();
    return future;
    }

    /**
    * @return 发布/订阅模式
    */
    @MessageMapping("/boradcast")
    @SendTo("/topic/broadcast")
    public String broadcast(String content){
    return "官博消息:"+content;
    }

    /**
    * @return 处理带异常的消息
    */
    @MessageMapping("/errorHanding")
    @SendTo("/queue/errorHanding")
    public Message handleError(Message message) throws Exception{
    if ("error".equalsIgnoreCase(message.getContent())){
    throw new Exception("消息包含错误内容");
    }
    return message;
    }
    }
  7. 启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package com.ygb;

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;

    /**
    * @author xiaoyuge
    */
    @SpringBootApplication
    public class Application {
    public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
    }
    }
  8. 前端页面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    <!DOCTYPE html>
    <html class="x-admin-sm" lang="en" xmlns:th="http://www.thymeleaf.org">
    <!--头部信息-->
    <div th:replace="fragments/taglib :: commonHead('STOMP 实时通信示例')"></div>
    <body>
    <div class="layui-fluid">
    <form>
    <div class="layui-form-item layui-form-text">
    <label for="messageInput" class="layui-form-label">消息内容</label>
    <div class="layui-input-block">
    <textarea placeholder="请输入消息内容" id="messageInput" name="desc" class="layui-textarea"></textarea>
    </div>
    </div>
    <div class="layui-form-item">
    <label class="layui-form-label"></label>
    <!-- 发送简单消息按钮 -->
    <button class="layui-btn" id="sendButton" onclick="return false;">发送消息</button>
    <!-- 发送异步消息按钮 -->
    <button class="layui-btn" id="sendAsyncButton" onclick="return false;">发送异步消息按钮</button>
    <!-- 发送异步消息按钮 -->
    <button class="layui-btn" id="broadcastButton" onclick="return false;">发送发布订阅消息按钮</button>
    </div>
    <!-- 显示接收到的消息 -->
    <div class="layui-form-item layui-form-text">
    <label for="messageInput" class="layui-form-label">接收到消息: </label>
    <div class="layui-input-block">
    <div id="messageContainer" name="desc"></div>
    </div>
    </div>
    </form>
    </div>
    </body>

    <script th:inline="javascript">
    var stompClient = null;

    // 建立 WebSocket 连接
    function connect() {
    var socket = new SockJS('/websocket-endpoint');
    stompClient = Stomp.over(socket);

    stompClient.connect({}, function (frame) {
    console.log('已连接: ' + frame);

    // 订阅消息主题
    stompClient.subscribe('/topic/messages', function (message) {
    showMessage(JSON.parse(message.body).content);
    });

    // 订阅异步消息主题
    stompClient.subscribe('/topic/asyncMessages', function (message) {
    showMessage("异步消息: " + JSON.parse(message.body).content);
    });

    // 订阅广播消息
    stompClient.subscribe('/topic/broadcasts', function (message) {
    showMessage("广播消息: " + message.body);
    });
    });
    }

    // 显示消息
    function showMessage(message) {
    $("#messageContainer").append("<p>" + message + "</p>");
    }

    // 发送消息
    $("#sendButton").click(function () {
    var message = {content: $("#messageInput").val()};
    stompClient.send("/app/send", {}, JSON.stringify(message));
    });

    // 发送异步消息
    $("#sendAsyncButton").click(function () {
    var message = {content: $("#messageInput").val()};
    stompClient.send("/app/asyncMessage", {}, JSON.stringify(message));
    });

    // 发送广播消息
    $("#broadcastButton").click(function () {
    stompClient.send("/app/broadcast", {}, $("#messageInput").val());
    });

    // 连接 WebSocket
    connect();
    </script>
    </html>
  9. 启动并访问

5. 总结

本文介绍了如何在Spring Boot中结合STOMP协议实现实时数据通信。通过配置WebSocket支持、创建消息控制器、构建前端界面,并结合@ConfigurationProperties进行配置管理,实现了一个功能完备的实时消息系统。STOMP协议提供的简化消息传输和高效通信的优势,使得它成为构建实时Web应用的理想选择。