Java响应式编程介绍
1. 响应式编程概念
1.1 什么是响应式编程
在介绍讨论响应式编程之前,先来看一个我们经常使用的一款生产力工具—excel。假设某电商网站正在搞促销活动,任何单品都可以参加“满199减40”的活动。
上图中蓝色为引用关系,即当单价或数量发生变化时,其它单元格也会动态改变。上面就是一个典型的响应式例子。
总的来说响应式编程具有3个特点:
- 变化传递
- 数据流形式
- 声明式范式
1.2 变化传递
概念:变化传递意思是当某个对象或变量发生改变时,所有引用或使用该变量的其它对象组件也会实时变化。
下面举个🌰
假设购物车管理和付款金额计算是两个不同的模块,或者至少是两个不同的类——Cart
和Pay
。我们希望购物车发生变化时,金额也能动态变化。
方案一
public class Cart {
private Pay pay; // 1.Pay组件
public boolean addProduct(Product product, int quantity) {
double price = product.getPrice() * quantity; // 2. 金额变化
pay.update(price); // 3.调用pay更新金额
}
}
缺点
- Cart对象需要知道Pay对象的存在,不符合迪米特原则
方案二
public class Pay {
public Pay(Cart cart) {
this.listenOn(cart); // 1. 监听购物车对象
}
// 2. 当购物车发生变化时的回调方法
public void onCartChange(CartEvent event) {
update(event) // 3. 更新变化
}
}
上面的代码我们可以想到回调或者观察者模式,本质上响应式编程的实现手段就是事件发布/回调。
那么,这些变化事件什么时候发出,以及发出形式是什么呢?
1.3 数据流形式
这些数据/事件在响应式编程里会以数据流的形式发出。例如每次往购物车里添加或移除一种商品,或调整商品的购买数量都会发布一个事件。这些操作事件连起来就是一个数据流。这是响应式的另一个核心特点:基于数据流(data stream)。
public Pay(Cart cart) {
...
this.listenOn(cart.eventStream()); // 1. 监听流
...
}
至于为什么要用流的形式?流编程的好处是什么?
1.4 声明式范式
public void listenOn(DataStream<CartEvent> cartEventStream) {
double sum = 0;
double total = cartEventStream
// 分别计算商品金额
.map(cartEvent -> cartEvent.getProduct().getPrice() * cartEvent.getQuantity())
// 计算满减后的商品金额
.map(v -> (v > 199) ? (v - 40) : v)
// 将金额的变化累加到sum
.map(v -> {sum += v; return sum;})
// 根据sum判断是否免邮,得到最终总付款金额
.map(sum -> (sum > 500) ? sum : (sum + 50));
...
上述就是一种声明式(declarative)的编程范式。通过四个串起来的map
调用,我们先声明好了对于数据流“将会”进行什么样的处理,当有数据流过来时,就会按照声明好的处理流程逐个进行处理。
声明式编程范式的威力在于以不变应万变。无论到来的元素是什么,计算逻辑是不变的,从而让数据和数据处理逻辑隔离开。比如处理逻辑是榨果汁,那么如果上游是苹果,出来就是苹果汁。上游是西瓜,出来就是西瓜汁。
1.5 小总结
总结起来,响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。
响应式编程就相当于工业处理管道;入口处是变化事件,管道里面都是流动的“数据流”;管道的图纸是用“声明式”的语言表示的。
2. 响应式流的特点
上面了解到响应式编程的一个关键是数据流形式。那我们常用的Java Stream算不算响应式流呢?总的来说,响应式流的特点是和响应式编程的应用场景有较大的关联性:
- 异步非阻塞
- 反压/流量控制
2.1 异步非阻塞
在如今互联网时代的大背景下,Web应用通常要面对高并发、多服务调用的挑战,性能从来都是必须要考量的核心因素。一般来说,同步阻塞就是性能杀手之一
复习一下:
阻塞和非阻塞反映的是调用者的状态
- 阻塞:调用者调用服务提供的方法后,需要一直等待结果而无法进行后续操作
- 非阻塞:调用者调用服务提供的方法后能立刻返回
同步和异步反映的是服务者的能力
- 同步:服务提供者需要方法处理完才能返回,或者需要调用方主动查询
- 异步:服务提供者能够立马返回,并在处理完成后通过某种方式通知到调用者
2.1.1 阻塞情景及解决方法
情景:订单服务需要调用用户服务的某些方法
传统方式
interface UserService{
Output getName(Input value);
}
class OrderService{
private UserService userService;
void process() {
Input input = ...;
Output name = userService.getName(input); //阻塞,等待完成
....
}
}
缺点:性能低
回调
interface UserService{
void getName(Input value, Function callback); //没有返回值,可立刻返回
}
class UserServiceImpl implement UserService {
public void getName(Input value, Function callback) {
//这里既可以同步,也可以异步返回
Output name = new Output();
callback(name); //调用回调函数,把结果返回
}
}
class OrderService{
private UserService userService;
void process() {
Input input = ...;
Output result;
userService.getName(input, output -> {
....
result = output //回调函数中获取结果
});
.... //没有阻塞,可继续往下执行
}
}
缺点:回调地狱
多线程Future
interface UserService{
Future<Output> getName(Input value);
Future<Output> getAddress(Input value);
}
class OrderService{
private UserService userService;
void process() {
Input input = ...;
Future<Output> name = userService.getName(input); //直接返回
Future<Output> address = userService.getAddress(input) //直接返回
name.get(); //阻塞获取
.... //其它name的加工操作
address.get(); //阻塞获取
.... //其它address的加工操作
}
}
缺点:多个Future的异步获取又变成同步了,无法立刻处理先返回的的结果
Java8 CompletableFuture接口
interface UserService{
CompletableFuture<Output> getName(Input value);
CompletableFuture<Output> getAddress(Input value);
}
class OrderService{
private UserService userService;
void process() {
Input input = ...;
userService.getName(input)
.thenApply(out1 -> {...}) //其它加工操作
.thenAcceput(out3 -> {...}) //获得最终结果
userService.getAddress(input)
.thenApply(out1 -> {...}) //其它加工操作
.thenAcceput(out3 -> {...}) //获得最终结果
}
}
缺点:completionFuture一旦异步任务较多时可编排性 以及 可读性太差
举个🌰
需求:我们首先得到 id 的列表,然后对每一个id进一步获取到id对应的name和address这样一对属性的组合
completionFuture形式
//获取所有的ids
//对每一个id都要生成一个组合completionFuture任务 (异步获取name,异步获取addr)
//将上述所有id生成的组合任务再组合起来成一个任务
//等待所有异步任务完成
CompletableFuture<List<String>> ids = getids();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = getName(i);
CompletableFuture<String> addrTask = getAddress(i);
return nameTask.thenCombineAsync(addrTask, (name, addr) -> "Name " + name + " has addr " + addr);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream(
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
响应式流形式(天然的异步非阻塞)—— 类似Stream的编程方式
// Flux修饰代表这是一个响应式流
Flux<String> ids = getIds();
Flux<String> combinations =
ids.flatMap(id -> { //id作为一个响应式流
Mono<String> nameTask = getName(id); //对每个id异步获取name
Mono<String> addrTask = getAddress(id); //对每个id异步获取addr
return nameTask.zipWith(addrTask, //组合结果并返回
(name, addr) -> "Name " + name + " has addr " + addr);
});
2.2 反压/流量控制
2.2.1 推-拉模型
上面说到响应式编程的实现手段就是事件发布,发送数据的叫生产者,接受数据的叫订阅者,那么关于数据的传递获取方式就有两种模型,push模型和pull模型
纯拉模型
每次消费者需要元素时,主动向生产者发起请求,生产者被动发送数据
优点:消费者按需消费,不会造成内存溢出
缺点:多个拉请求是同步执行的,网络开销大,性能低
纯推模型
生产者一旦有元素到达则立刻发送给消费者
优点:连接开销少,生产者不断有可用的数据,性能高
缺点:当生产者速度高于消费者时,消费者会被“压垮”
推-拉结合模型
消费者可以异步发送请求获取若干个元素
优点:消费者按需消费,同时在消费过程中也能够发起资源拉取的申请
2.2.2 反压/流量控制
上述这种能够向上游反馈流量请求的机制就叫做反压机制,它能够让下游根据自身能力去处理数据而不会被压垮。
反压策略
- 消费者控制,需要时再生产
- 生产者缓存
- 生产者丢弃
2.3 小总结
以上就是响应式流的两个核心特点:异步非阻塞,以及基于“反压”机制的流量控制。
这样我们有了基于响应式流的“升级版”的响应式编程:
3. Spring响应式处理模型
3.1 对比 WebFlux 和 WebMVC 的处理模型
WebMVC处理模型(同一个线程)
所有请求都排队并由一个 Thread 按顺序进行处理。黑条表示此处存在迸出I/O 的阻塞读/写操作 。 此外,如你所见,实际处理时间(白条 ) 远少于花费在阻塞操作上 的时间 。
WebFlux处理模型(同一个线程)
将图中的异步、非阻塞请求处理与阻塞示例进行比较,可以发现,系统没有在收集请求消息体时发生等待。Thread 被高效的使用以接受新连接。也就是,当某个请求因读写等待时,线程不会阻塞而是会切换到处理其它请求。当某个请求读写完成时,线程会切换回来处理该请求。
所以,如果要问服务端最大的性能瓶颈是什么,那答案一定是IO,因为处理一个请求的过程中最耗时的部分就是等待IO,而等待就会造成阻塞,所以如果要提升性能,就不能写出阻塞的代码来。而响应式编程就是做这样的事。
4. 总结
响应式编程的本质,就是一个异步编程的轮子,用观察者模式的API以及流式编程把异步编程的过程变得更加清晰和简单,就像MVC封装了Servlet,底层其实还是对已有技术的封装
随着各个web系统的并发量增加,毫无疑问响应式编程式未来的趋势之一,在 Spring 家族的 Spring Webflux、Spring Cloud Gateway 等诸多框架中都得到了广泛应用。
其它内容
- Java响应式编程的核心接口(Publisher、Subscriber、Subscription 和 Processor<T,R>)
- 响应式编程宣言
- 响应式Web(Spring5中的flux和Mono)
- 响应式Spring Security
- 响应式Spring Data
- .........