searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

disruptor队列介绍和使用

2023-08-02 02:35:19
13
0

disruptor介绍

参考文档

https://tech.meituan.com/2016/11/18/disruptor.html

https://www.cnblogs.com/pku-liuqiang/p/8544700.html

https://juejin.cn/post/6844903685609226254

 

介绍

disruptor是什么

Disruptor 是由英国的外汇交易公司LMAX创造的一个高效队列,它的设计初衷是为了解决内存队列中的延时问题(在性能测试中发现与1/0操作有相同的数量级)。基于Disruptor的系统可以在单线程下支持每秒处理600万订单。在2010年的QCon讲座后,它引起了业界的注意。2011年,企业应用软件专家Martin Fowler专门撰写了一篇长篇文章进行介绍。在同一年,它还获得了Oracle官方的Duke大奖。从数据结构的角度来看,Disruptor是一个支持生产者消费者模式的环形队列,它能在无锁的情况下并行消费,也可以根据消费者间的依赖关系进行有序消费。

Disruptor的设计方案

为了解决队列速度慢的问题,Disruptor引入了以下设计:

  • 环形数组结构:使用数组而非链表,避免垃圾回收,并且与处理器缓存机制更加兼容。

  • 元素位置定位:采用长度为2的幂次的数组,通过位运算快速定位元素位置。下标按递增顺序排列,不会导致索引溢出。使用长整型(long)索引,即使每秒处理100万个请求,也需要30万年才能用完。

  • 无锁设计:每个生产者或消费者线程先申请可操作的数组位置,一旦获得位置,直接在该位置读取或写入数据。无需考虑数组的环形结构。整个过程通过原子变量CAS实现,确保线程操作的安全性。

 

事件消费模式

  • 多个消费者重复消费同一条消息数据
  • 多个消费者非重复消费同一条消息数据

 

多个消费者重复消费同一条消息数据

 

多个消费者非重复消费同一条消息数据

 

 

基本使用

下面以“单生产者-多消费者-多消费者不重复消费数据”这个demo来展示如何使用disruptor

1、创建一个类,这个类表示队列里面存储数据的类型

2、创建生产者类,用于生产数据到队列

3、创建消费者类

如果是多消费者不重复消费同一条数据则实现WorkHandler接口,并且onEvent为消费数据的逻辑

4、创建Disruptor队列

下面解释一下Disruptor的构造参数:

 

EventFactory:定义队列里面数据是怎么创建的工厂类,因为队列的对象是会全部创建出来的,如何创建取决于EventFactory的定义

 

ringBufferSize:队列的容量大小,这个值设定了就是固定的,启动Disruptor的时候就会初始化这个数量的元素

 

ThreadFactory:创建生产消费者线程的工厂类

 

ProducerType:生产者数量,SINGLE为单生产者,MULTI为多生产者

 

WaitStrategy:当队列没数据时,消费者线程的处理策略,主要有下面8种

BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU

BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)

LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用

TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑

LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作

SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠

YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU

PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

 

ExceptionHandler:设置消费过程的异常处理器,主要有3类异常要处理:队列启动异常、消息者消费数据异常、队列关闭异常

5、创建多个消费者,并且与队列绑定

6、启动队列进程,开始工作,生产者生产数据

7、关闭队列和生产者消费者线程

 

细节说明

  • 队列始终有且仅有会存放环形数组容量个数的对象,即便这些元素被消费线程消费了也不会被释放
  • 如果队列里面的元素到达了环形数组容量,那么生产者线程就会被阻塞
  • 如何消费数据出现异常,在异常处理器里面,异常不能往外抛,否则会导致waiting
  • 消费者线程是一开始就确定好有多少个的,而且要全部都运行起来,目前我们是使用线程池动态创建线程,总数控制在corePoolSize内
  • disruptor新用法不建议使用线程池了,cloudcanal也没有使用线程池,如果是只有一个disruptor队列的话,生产者和消费者线程都是一开始就启动的,中途也不会销毁,确实可以不需要线程池,但我们dts一个表的迁移就对应一个消息队列,这个表跑完了就要销毁队列和线程,这样看有线程池效率确实高一些
0条评论
0 / 1000
宋****祥
4文章数
0粉丝数
宋****祥
4 文章 | 0 粉丝
原创

disruptor队列介绍和使用

2023-08-02 02:35:19
13
0

disruptor介绍

参考文档

https://tech.meituan.com/2016/11/18/disruptor.html

https://www.cnblogs.com/pku-liuqiang/p/8544700.html

https://juejin.cn/post/6844903685609226254

 

介绍

disruptor是什么

Disruptor 是由英国的外汇交易公司LMAX创造的一个高效队列,它的设计初衷是为了解决内存队列中的延时问题(在性能测试中发现与1/0操作有相同的数量级)。基于Disruptor的系统可以在单线程下支持每秒处理600万订单。在2010年的QCon讲座后,它引起了业界的注意。2011年,企业应用软件专家Martin Fowler专门撰写了一篇长篇文章进行介绍。在同一年,它还获得了Oracle官方的Duke大奖。从数据结构的角度来看,Disruptor是一个支持生产者消费者模式的环形队列,它能在无锁的情况下并行消费,也可以根据消费者间的依赖关系进行有序消费。

Disruptor的设计方案

为了解决队列速度慢的问题,Disruptor引入了以下设计:

  • 环形数组结构:使用数组而非链表,避免垃圾回收,并且与处理器缓存机制更加兼容。

  • 元素位置定位:采用长度为2的幂次的数组,通过位运算快速定位元素位置。下标按递增顺序排列,不会导致索引溢出。使用长整型(long)索引,即使每秒处理100万个请求,也需要30万年才能用完。

  • 无锁设计:每个生产者或消费者线程先申请可操作的数组位置,一旦获得位置,直接在该位置读取或写入数据。无需考虑数组的环形结构。整个过程通过原子变量CAS实现,确保线程操作的安全性。

 

事件消费模式

  • 多个消费者重复消费同一条消息数据
  • 多个消费者非重复消费同一条消息数据

 

多个消费者重复消费同一条消息数据

 

多个消费者非重复消费同一条消息数据

 

 

基本使用

下面以“单生产者-多消费者-多消费者不重复消费数据”这个demo来展示如何使用disruptor

1、创建一个类,这个类表示队列里面存储数据的类型

2、创建生产者类,用于生产数据到队列

3、创建消费者类

如果是多消费者不重复消费同一条数据则实现WorkHandler接口,并且onEvent为消费数据的逻辑

4、创建Disruptor队列

下面解释一下Disruptor的构造参数:

 

EventFactory:定义队列里面数据是怎么创建的工厂类,因为队列的对象是会全部创建出来的,如何创建取决于EventFactory的定义

 

ringBufferSize:队列的容量大小,这个值设定了就是固定的,启动Disruptor的时候就会初始化这个数量的元素

 

ThreadFactory:创建生产消费者线程的工厂类

 

ProducerType:生产者数量,SINGLE为单生产者,MULTI为多生产者

 

WaitStrategy:当队列没数据时,消费者线程的处理策略,主要有下面8种

BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU

BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)

LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用

TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑

LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作

SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠

YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU

PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

 

ExceptionHandler:设置消费过程的异常处理器,主要有3类异常要处理:队列启动异常、消息者消费数据异常、队列关闭异常

5、创建多个消费者,并且与队列绑定

6、启动队列进程,开始工作,生产者生产数据

7、关闭队列和生产者消费者线程

 

细节说明

  • 队列始终有且仅有会存放环形数组容量个数的对象,即便这些元素被消费线程消费了也不会被释放
  • 如果队列里面的元素到达了环形数组容量,那么生产者线程就会被阻塞
  • 如何消费数据出现异常,在异常处理器里面,异常不能往外抛,否则会导致waiting
  • 消费者线程是一开始就确定好有多少个的,而且要全部都运行起来,目前我们是使用线程池动态创建线程,总数控制在corePoolSize内
  • disruptor新用法不建议使用线程池了,cloudcanal也没有使用线程池,如果是只有一个disruptor队列的话,生产者和消费者线程都是一开始就启动的,中途也不会销毁,确实可以不需要线程池,但我们dts一个表的迁移就对应一个消息队列,这个表跑完了就要销毁队列和线程,这样看有线程池效率确实高一些
文章来自个人专栏
数据库相关
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0