Disruptor是什么?
官网介绍:https://github.com/LMAX-Exchange/disruptor/wiki(https://github.com/LMAX-Exchange/disruptor/wiki)
Disruptor是一个无锁的并发框架
Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。
特点:
- 大大简化了并发程序开发的难度,性能上比Java本身提供的一些并发包好。
- 是一个高性能异步处理框架,实现了观察者模式。
- 是无锁的,CPU友好。它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制的启动频率。
- 业务逻辑是纯内存操作,使用事件驱动方式。
Disruptor核心
Disruptor核心是一个RingBuffer
- RingBuffer是一个数组,没有首尾指针。
- RingBuffer是一个首尾相接的环,用于不同线程间传递数据的Buffer。
- RingBuffer有一个序号,这个序号指向数组中下一个可用的元素。
- 随着数据不断的填充这个数组,这个序号会一直增长,直到绕过这个环。
- 前序号指向的元素,可以通过mod计算:序号%长度=索引。
- 建议将长度设置为2的N次方,有利于二进制计算:序号&(长度-1)=索引。
如果RingBuffer满了,那么是继续覆盖,还是等待消费,这个是由生产者和消费者决定的。
假设RingBuffer满了,生产者有两个选择
- 等待RingBuffer有空位在填充
- 直接覆盖
同时消费者也可以做出选择
- 等待RingBuffer满了再消费
- RingBuffer填充一个就消费一个
Sequence
- 通过顺序递增的序号来编号,管理正在进行交换的数据(事件)
- 对数据处理的过程总是沿着需要逐个递增处理,实现线程安全
- 一个Sequence用于跟踪标识某个特定的事件处理者的处理进度
- 生产者和消费者都各自拥有各自的Sequence
- 如果多个生产者和多个消费者,他们每一个都会拥有各自的Sequence
- Sequence可以看成是一个
volatile long
类型的数值
Sequencer
- Sequencer是Disruptor高性能的核心
- 他是一个interface,主要实现生产者和消费者的快速、正确的传递数据的并发算法
SequenceBarrier
- 用于保持RingBuffer的生产者和消费者之间的平衡关系
- 决定消费者是否还有可处理事件的逻辑
EventProcessor
主要事件循环,处理Disruptor中的Event,拥有消费者的EventProcessor
Disruptor Quick Start
官网参考:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started(https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started)
开发模型
- 定义Event,代表Disruptor所处理的数据单元
/** * @author fangxi */ @Data @NoArgsConstructor @AllArgsConstructor public class OrderEvent { private Long id; private String productName; private BigDecimal price; }
- 定义Event工厂,实现
EventFactocy<?>
接口,用来填充RingBuffer容器,如果使用1.8的话,则不需要这个工厂对象,直接在构造方法里面OrderEvent::new
/**
* @author fangxi
* <p>
* 创建orderEvent对象
*/
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
- 定义Event处理器(消费者),实现
EventHandler<?>
,用来从RingBuffer中取出数据并处理。如果使用1.8处理,可以不需要这个类,用lambda代替
/**
* @author fangxi
* 消费者
*/
public class OrderEventHandler implements EventHandler<OrderEvent> {
/**
* 有消息发送,这个就会被监听
*/
@Override
public void onEvent(OrderEvent orderEvent,long sequence, boolean endOfBatch) throws Exception {
System.out.println(orderEvent);
}
}
- 组合1-3步
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.math.BigDecimal;
/**
* @author fangxi
*/
public class DisruptorTest {
public static void main(String[] args) {
int bufferSize = 1024;
// 实例化Disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// 添加消费者监听
// disruptor.handleEventsWith(new OrderEventHandler());
// 消费者,lambda表示
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("消费者:" + event));
// 启动
// 实际存储数据的容器
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 投递100条数据
for (int i = 0; i < 100; i++) {
// 生产者,就用lambda表示
ringBuffer.publishEvent((event, sequence) -> {
event.setId(sequence);
event.setPrice(new BigDecimal("100").add(new BigDecimal(sequence)));
event.setProductName(sequence + "号商品");
});
}
disruptor.shutdown();
}
}
Disruptor高阶操作
涉及到的Event对象
@Data
public class Trade {
private String id;
private String name;
private String nickname;
private BigDecimal price;
}
串形操作和并行操作
让消费者按照我们制定的顺序操作
先设置三个消费者
// 三个消费者,将分配设置name,id和打印
EventHandler<Trade> nameHandler = (trade, sequence, endOfBatch) -> {
System.out.println("Set Name");
trade.setName("name" + sequence);
// 模拟耗时操作
TimeUnit.SECONDS.sleep(1);
};
EventHandler<Trade> idHandler = (trade, sequence, endOfBatch) -> {
System.out.println("Set Id");
trade.setId(UUID.randomUUID().toString());
TimeUnit.SECONDS.sleep(1);
};
EventHandler<Trade> printHandler = (trade, sequence, endOfBatch) -> {
System.out.println(trade);
};
串形操作
// 串形操作,按照顺序操作
disruptor.handleEventsWith(idHandler).handleEventsWith(nameHandler).handleEventsWith(printHandler);
串形操作打印的结果和handleEventsWith的链式调用顺序有关
并行操作
// 并行操作,三个handler并行执行
disruptor.handleEventsWith(idHandler, nameHandler, printHandler);
生产者
Random random = new Random();
// 也可以直接通过Disruptor对象提交任务
disruptor.publishEvent((trade, sequence) -> {
double price = random.nextDouble() * 9999;
trade.setPrice(new BigDecimal(String.valueOf(price)));
});
菱形操作
让串行操作和并行操作同时存在
上面的例子中,可以设置idHandler和设置nameHandler可以同时执行,让printHandler在这两个之后执行
// 菱形操作,让idHandler和nameHandler同时执行,printHandler在这两个操作之后在执行
disruptor.handleEventsWith(idHandler, nameHandler).handleEventsWith(printHandler);
// 也可以这样写
disruptor.handleEventsWith(idHandler, nameHandler).then(printHandler);
多边形操作
实现多边形操作,需要再新增两个handler
// 新增两个handler
EventHandler<Trade> nicknameHandler = (trade, sequence, endOfBatch) -> {
System.out.println("nicknameHandler");
trade.setNickname("[" + trade.getName() + "]");
};
EventHandler<Trade> idNotLineHandler = (trade, sequence, endOfBatch) -> {
System.out.println("idNotLineHandler");
trade.setId(trade.getId().replaceAll("-", ""));
};
要实现一下的操作
// idHander和nameHandler并行
disruptor.handleEventsWith(idHandler, nameHandler);
// idHander之后执行idNotLineHandler
disruptor.after(idHandler).handleEventsWith(idNotLineHandler);
// nameHandler之后执行nicknameHandler
disruptor.after(nameHandler).handleEventsWith(nicknameHandler);
// 在idNotLineHandler, nicknameHandler之后执行printHandler
disruptor.after(idNotLineHandler, nicknameHandler).handleEventsWith(printHandler);