Spring Cloud Stream详解

1. 简介

官方定义: Spring Cloud Stream是一个用于构建于共享消息系统连接的高度可扩展的事件驱动微服务。

目前主流的消息框架有:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

假设公司公司业务项目用了RabbitMQ,而大数据项目用了Kafka,这时就会出现两个消息框架,消息模型上也存在差异,并且代码上用法也不一样。

Spring Cloud Stream就是不再关注具体MQ细节,可以在不改变代码的基础上,来完成Rabbit和Kafka两个不同的消息中间件的切换(这里切换是指原本用的RabbitMQ,但是需要换乘Kafka,所以换框架)

总结:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

也就是基于Stream可以和如下消息中间件整合使用

1.2 设计思想

常规的MQ设计如下:

  • Message: 生产者/消费者之间靠消息媒介传递信息内容

  • MessageChannel: 消息必须走特定的通道

  • 队列: 假如发消息会先发送到消息队列中

  • 消息队列的消息消费: 订阅的人可以进行消费

Cloud Stream 设计如下:

通过定义绑定器Binder作为中间层,实现应用程序和消息中间件细节之间的隔离。

在没有绑定器这个概念的情况下,应用要直接于消息中间件进行信息交互的时候,有于各消息中间件构建的初衷不同,它们的实现细节会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序于消息中间件细节之间的隔离。

Stream对消息中间件的进一步封装,可以坐待代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),是的微服务开发的高度结偶,服务可以关注更多自己的业务流程

Binder可以生成Binding, Binding用来绑定消息容器的生产者和消费者,它有两种类型INPUT-消费者OUTPUT-生产者

stream为了屏蔽差异,抽象出一个Binder层,而Spring官方提供了Kafka和Rabbit MQ这两个框架的实现

  • Stream 连接RabbitMQ
    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  • Stream 连接Kafka
    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kafka中就是`Topic

1.3 标准流程

  • Binder: 连接中间件,屏蔽差异

  • Channel:通道,是队列Queue的一种抽象,在消息通讯洗头膏中就是实现存储和转发的媒介,通过Channel对队列进行配置

  • Source(发送者)和 Sink(接受者): 从Stream发布消息就是输出,接收消息就是输入

1.4 注解

注解完全是基于官方给的模型耳钉的,通过Stream使用消息中间件也是非常简单的,直接使用一下注解即可

注解 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用于消息中间件之间的封装,目前实现了Kafka和RabbitMq的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型
@Input 输入通道,通过该输入通道接收到的消息进入应用程序
@Output 输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指通道Channel和exchange绑定在一起

2 示例工程

生产者就是消息发送者,消费者就是消息接受者,这里使用的是RabbitMQ

2.1 生产者

  1. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <springboot.version>2.6.8</springboot.version>
    <springcloud.version>2021.0.3</springcloud.version>
    </properties>

    <dependencyManagement>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>${springboot.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>${springcloud.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    </dependencies>
    </dependencyManagement>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    </dependencies>
  2. 添加配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    server:
    port: 8801

    spring:
    application:
    name: cloud-stream-provider
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  3. 添加接口

    1
    2
    3
    public interface IMessageProvider {
    public String send();
    }
  4. 实现接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;

    import javax.annotation.Resource;
    import java.util.UUID;

    // 可以理解为是一个消息的发送管道的定义
    @EnableBinding(Source.class)
    public class MessageProviderImpl implements IMessageProvider {

    // 消息的发送管道
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
    String serial = UUID.randomUUID().toString();
    // 创建并发送消息
    this.output.send(MessageBuilder.withPayload(serial).build());
    System.out.println("***serial: " + serial);
    return serial;
    }
    }
  5. 控制器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RestController
    public class SendMessageController {

    @Autowired
    private IMessageProvider iMessageProvider;

    @GetMapping("send")
    public String send() {
    return iMessageProvider.send();
    }
    }
  6. 测试

  • 首先访问RabbitMQ: http://localhost:15672

  • 启动项目,并调用消息发送接口: http://localhost:8801/send; 下面波峰代表发送消息成功

    启动后会创建交换机,名称就是application.yml当中的destination属性设置的

    停止服务后并没有删除交换机

2.2 消费者

  1. 引入依赖,和生产者一致
  1. 添加配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    server:
    port: 8802

    spring:
    application:
    name: cloud-stream-consumer
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    input: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  2. 监听消息,消费者只负责接受消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;

    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
    System.out.println("消费者1号,------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort);
    }
    }
  3. 测试,启动消费者,它会向交换机中添加一个队列



    当服务停止后消息队列会被自动删除

2.3 存在的问题

  1. 按照8802,clone出一份运行8803,主要用来演示多个消费者的场景

  2. 一次启动8801生产者、8802消费者、8803消费者

当三个服务都启动后通过RabbitMQ界面会发现,一个交换机绑定了两个队列

运行后会发现存在两个问题:重复消费问题、消息持久化问题;

重复消费问题

发送消息后,两个消费者都接收到了消息: http://localhost:8801/send

比如下以下场景中,订单系统做集群部署,都会从RabbitMQ中获取订单小修,那么一个订单就会被多个服务获取到,就会造成数据错误,为了避免这种情况,我们可以使用Stream的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

微服务应用放置在同一个group中,就能保证消息只会被其中一个应用消费一次,同一个足内会发生竞争关系,只有一个可以消费;
修改application.yml配置文件,增加group配置,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: group1 # 设置消费者分组名称

当两个消费者都设置好后,会发现一个问题:实际上份到一个组对于RabbitMQ来说就是两个消费者监听一个队列。一个队列也就意味着,当队列收到一条消息,哪个消费者先消费就是谁的,消费完队列就没有了,也就是只有一个消费者能消费到消息。

假如不设置group属性时,默认是启动一个消费者就会创建一个消息队列,启动多个就会创建多个队列。 Stream默认使用的是RabbitMQ的 Topic交换机。
当发送者向这个交换机发送消息的时候,两个队列都会接收到

最终测试:8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

消息持久化问题

生产者发送消息的时候,消费者恰好宕机了,但是过了一会消费者恢复了,消息却没有收到,也就是意味着消息队列是临时消息队列;

当安个项目都启动的时候,按照步骤做以下几件事:

  1. 停止8802和8803,并去掉8802的分组group,8803不去分组信息,停止项目的时候会发现消息队列并没有删除,说明一旦设置分组信息,消息队列就不再是临时队列

  2. 8801发送4条消息

  3. 启动8802,然后消息并没有打印,没有接收到消息(8802去掉了分组信息)

  4. 在启动8803,有分组属性配置,后台打印出MQ上的消息

原因就是:当两个项目都停止的时候,队列并未删除,而8803还绑定了这个队列,所以他就算宕机了,又重启了,依然可以收到消息。而8802没有设置分组信息,他再启动后系统会给他创建一个临时队列,自然而然收不到之前的消息了。