SpringBoot+Disruptor

1. Disruptor介绍

  1. Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注

  2. Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟

  3. 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了

  4. Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升

  5. 其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案

  6. Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor

2. Disruptor核心概念

先从连接Disruptor的核心概念开始,来了解它是如何运作的,下面介绍的概念模型,既是领域对象,也是映射到代码实现的核心对象
核心架构图

2.1 Ring Buffer

环形的缓冲区,曾经RingBufferDisruptor中最主要的对象,但从3.0 版本开始,其职责被简化为仅仅负责对通过Disruptor进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中, RingBuffer可以由用户的自定义实现来完全替代

2.2 Sequence Disruptor

通过顺序递增的序号来管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的处理进度,虽然一个 AtomicLong也可以用于标识进度,但定义sequence来负责该问题还有另一个目的,就是防不同的sequence之间的CPU缓存伪共享(False Sharing)问题。

2.3 Sequencer

Sequencer是Disruptor的真正核心。此接口有两个实现类SingleProducerSequencerMultiProducerSequencer,他们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

2.4 Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

2.5 Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

2.6 Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

2.7 EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

2.8 EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

2.9 Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

3. 案例Demo

  1. 添加pom.xml依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    <parent>
    <artifactId>spring-boot-dependencies</artifactId>
    <groupId>org.springframework.boot</groupId>
    <version>2.3.5.RELEASE</version>
    </parent>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 主要的 -->
    <dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.1</version>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
    </dependency>
    </dependencies>
  1. 消息体Model
    1
    2
    3
    4
    @Data
    public class MessageModel {
    private String message;
    }
  2. 构造EventFactory
    1
    2
    3
    4
    5
    6
    public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
    return new MessageModel();
    }
    }
  3. 构造EventHandler - 消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {

    @Override
    public void onEvent(MessageModel messageModel, long l, boolean b) {
    try {
    //这里停顿1000ms是为了确定消费消息是异步的
    Thread.sleep(1000);
    log.info("消费者处理消息开始");
    if (messageModel != null) {
    log.info("消费者消费的消息是:{}", messageModel);
    }
    } catch (Exception e) {
    log.info("消费者处理消息失败");
    }
    log.info("消费者处理消息结束");
    }
    }
  4. 构造BeanManager
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * 获取实例化对象
    * @author xiaoyuge
    */
    @Component
    public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    BeanManager.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
    return applicationContext;
    }

    public static Object getBean(String name) {
    return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
    return applicationContext.getBean(clazz);
    }
    }
  5. 构造MQManager
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Configuration
    public class MQManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    //指定事件工厂
    HelloEventFactory factory = new HelloEventFactory();
    //指定ringBuffer字节大小,必须为2的N次方,能将求模运算转为位运算提高效率,否则将影响效率
    int bufferSize = 1024 * 256;

    Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
    //设置事件业务处理器---消费者
    disruptor.handleEventsWith(new HelloEventHandler());
    //启动disruptor线程
    disruptor.start();
    //获取ringBuffer环, 用于接收生产者生产的事件
    RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
    return ringBuffer;
    }
    }
  6. 构造MqService和实现类- 生产者
    1
    2
    3
    4
    5
    6
    7
    8
    public interface DisruptorMqService {

    /**
    * 消息
    * @param message 消息内容
    */
    void sayHelloMq(String message);
    }
    实现类:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Slf4j
    @Component
    @Service
    public class DsiruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    /**
    * 消息
    * @param message 消息内容
    */
    @Override
    public void sayHelloMq(String message) {
    log.info("record the message:{}", message);
    //获取下一个Event槽的下标
    long sequence = messageModelRingBuffer.next();
    try {
    MessageModel event = messageModelRingBuffer.get(sequence);
    event.setMessage(message);
    log.info("网消息队列中添加消息:{}",event);
    }catch (Exception e) {
    log.error("添加失败:{}",e);
    }finally {
    //发布Event,激活观察者去消费,将sequence传递给消费者
    //注意最后的publish方法必须放在finally中以确保必须调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
    messageModelRingBuffer.publish(sequence);
    }
    }
    }
  7. 构造测试类以及方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Slf4j
    @SpringBootTest(classes = App.class)
    @RunWith(SpringRunner.class)
    public class AppTest {

    @Autowired
    private DisruptorMqService disruptorMqService;

    @Test
    public void sayHelloTest() throws Exception {
    disruptorMqService.sayHelloMq("消息到了, hello world !");
    log.info("消息队列以发送完毕");
    //这里停顿2000ms是为了确定处理消息是异步的
    Thread.sleep(2000);
    }
    }
    运行结果:
    1
    2
    3
    4
    5
    6
    2023-01-10 20:44:15.461  INFO 7643 --- [           main] org.example.DsiruptorMqServiceImpl       : record the message:消息到了, hello world !
    2023-01-10 20:44:15.463 INFO 7643 --- [ main] org.example.DsiruptorMqServiceImpl : 网消息队列中添加消息:MessageModel(message=消息到了, hello world !)
    2023-01-10 20:44:15.463 INFO 7643 --- [ main] org.example.AppTest : 消息队列以发送完毕
    2023-01-10 20:44:16.467 INFO 7643 --- [pool-1-thread-1] org.example.handler.HelloEventHandler : 消费者处理消息开始
    2023-01-10 20:44:16.468 INFO 7643 --- [pool-1-thread-1] org.example.handler.HelloEventHandler : 消费者消费的消息是:MessageModel(message=消息到了, hello world !)
    2023-01-10 20:44:16.468 INFO 7643 --- [pool-1-thread-1] org.example.handler.HelloEventHandler : 消费者处理消息结束

4. 总结

其实生产者-消费者模式是很常见的,通过一些消息队列也可以轻松做到上述的效果,不同的地方在于:Disruptor是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因