searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Java响应式编程介绍-以Spring WebFlux为例

2023-08-25 06:19:09
35
0

Java响应式编程介绍

1. 响应式编程概念

1.1 什么是响应式编程

在介绍讨论响应式编程之前,先来看一个我们经常使用的一款生产力工具—excel。假设某电商网站正在搞促销活动,任何单品都可以参加“满199减40”的活动

上图中蓝色为引用关系,即当单价或数量发生变化时,其它单元格也会动态改变。上面就是一个典型的响应式例子。

总的来说响应式编程具有3个特点:

  • 变化传递
  • 数据流形式
  • 声明式范式

1.2 变化传递

概念变化传递意思是当某个对象或变量发生改变时,所有引用或使用该变量的其它对象组件也会实时变化。

下面举个🌰

假设购物车管理和付款金额计算是两个不同的模块,或者至少是两个不同的类——CartPay。我们希望购物车发生变化时,金额也能动态变化。

方案一

public class Cart {
    private Pay pay; // 1.Pay组件
    public boolean addProduct(Product product, int quantity) {
        double price = product.getPrice() * quantity; // 2. 金额变化
        pay.update(price); // 3.调用pay更新金额
    }
}

缺点

  1. Cart对象需要知道Pay对象的存在,不符合迪米特原则

方案二

public class Pay {
    public Pay(Cart cart) {
        this.listenOn(cart);    // 1. 监听购物车对象
    }
    // 2. 当购物车发生变化时的回调方法
    public void onCartChange(CartEvent event) {
        update(event) // 3. 更新变化
    }
}

上面的代码我们可以想到回调或者观察者模式,本质上响应式编程的实现手段就是事件发布/回调

那么,这些变化事件什么时候发出,以及发出形式是什么呢?

1.3 数据流形式

这些数据/事件在响应式编程里会以数据流的形式发出。例如每次往购物车里添加或移除一种商品,或调整商品的购买数量都会发布一个事件。这些操作事件连起来就是一个数据流。这是响应式的另一个核心特点:基于数据流(data stream)

public Pay(Cart cart) {
        ...
        this.listenOn(cart.eventStream());  // 1. 监听流
        ...
    }

至于为什么要用流的形式?流编程的好处是什么?

1.4 声明式范式

public void listenOn(DataStream<CartEvent> cartEventStream) {  
    double sum = 0;
    double total = cartEventStream
        // 分别计算商品金额
        .map(cartEvent -> cartEvent.getProduct().getPrice() * cartEvent.getQuantity())
        // 计算满减后的商品金额
        .map(v -> (v > 199) ? (v - 40) : v)
        // 将金额的变化累加到sum
        .map(v -> {sum += v; return sum;})
        // 根据sum判断是否免邮,得到最终总付款金额
        .map(sum -> (sum > 500) ? sum : (sum + 50));
        ...

上述就是一种声明式(declarative)的编程范式。通过四个串起来的map调用,我们先声明好了对于数据流“将会”进行什么样的处理,当有数据流过来时,就会按照声明好的处理流程逐个进行处理。

声明式编程范式的威力在于以不变应万变。无论到来的元素是什么,计算逻辑是不变的,从而让数据和数据处理逻辑隔离开。比如处理逻辑是榨果汁,那么如果上游是苹果,出来就是苹果汁。上游是西瓜,出来就是西瓜汁。

1.5 小总结

总结起来,响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。

响应式编程就相当于工业处理管道;入口处是变化事件,管道里面都是流动的“数据流”;管道的图纸是用“声明式”的语言表示的。

2. 响应式流的特点

上面了解到响应式编程的一个关键是数据流形式。那我们常用的Java Stream算不算响应式流呢?总的来说,响应式流的特点是和响应式编程的应用场景有较大的关联性:

  • 异步非阻塞
  • 反压/流量控制

2.1 异步非阻塞

在如今互联网时代的大背景下,Web应用通常要面对高并发、多服务调用的挑战,性能从来都是必须要考量的核心因素。一般来说,同步阻塞就是性能杀手之一

复习一下:

阻塞和非阻塞反映的是调用者状态

  • 阻塞:调用者调用服务提供的方法后,需要一直等待结果而无法进行后续操作
  • 非阻塞:调用者调用服务提供的方法后能立刻返回

同步和异步反映的是服务者能力

  • 同步:服务提供者需要方法处理完才能返回,或者需要调用方主动查询
  • 异步:服务提供者能够立马返回,并在处理完成后通过某种方式通知到调用者

2.1.1 阻塞情景及解决方法

情景:订单服务需要调用用户服务的某些方法

传统方式

interface UserService{
	Output getName(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		Output name = userService.getName(input); //阻塞,等待完成
		....
	}
}

缺点:性能低

回调

interface UserService{
	void getName(Input value, Function callback); //没有返回值,可立刻返回
}

class UserServiceImpl implement UserService {
    public void getName(Input value, Function callback) {
        //这里既可以同步,也可以异步返回
        Output name = new Output();
        callback(name); //调用回调函数,把结果返回
    }
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
        Output result;
		userService.getName(input, output -> {
            ....
            result = output 	//回调函数中获取结果
        });  
		.... //没有阻塞,可继续往下执行
	}
}

缺点:回调地狱

多线程Future

interface UserService{
	Future<Output> getName(Input value);
    Future<Output> getAddress(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		Future<Output> name = userService.getName(input); //直接返回
        Future<Output> address = userService.getAddress(input) //直接返回
        name.get(); //阻塞获取
        .... //其它name的加工操作
        address.get(); //阻塞获取
		.... //其它address的加工操作
	}
}

缺点:多个Future的异步获取又变成同步了,无法立刻处理先返回的的结果

Java8 CompletableFuture接口

interface UserService{
	CompletableFuture<Output> getName(Input value);
    CompletableFuture<Output> getAddress(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		userService.getName(input)
            .thenApply(out1 -> {...}) //其它加工操作
            .thenAcceput(out3 -> {...}) //获得最终结果
            
        userService.getAddress(input)
            .thenApply(out1 -> {...}) //其它加工操作
            .thenAcceput(out3 -> {...}) //获得最终结果
	}
}

缺点:completionFuture一旦异步任务较多时可编排性 以及 可读性太差

举个🌰

需求:我们首先得到 id 的列表,然后对每一个id进一步获取到id对应的name和address这样一对属性的组合

completionFuture形式

//获取所有的ids
//对每一个id都要生成一个组合completionFuture任务 (异步获取name,异步获取addr)
//将上述所有id生成的组合任务再组合起来成一个任务
//等待所有异步任务完成

CompletableFuture<List<String>> ids = getids(); 
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> { 
                CompletableFuture<String> nameTask = getName(i); 
                CompletableFuture<String> addrTask = getAddress(i); 

                return nameTask.thenCombineAsync(addrTask, (name, addr) -> "Name " + name + " has addr " + addr); 
            }); 
    
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
    return allDone.thenApply(v -> combinationList.stream(
            .map(CompletableFuture::join) 
            .collect(Collectors.toList()));
});

响应式流形式(天然的异步非阻塞)—— 类似Stream的编程方式

// Flux修饰代表这是一个响应式流
Flux<String> ids = getIds(); 

Flux<String> combinations =
    ids.flatMap(id -> {  //id作为一个响应式流
        Mono<String> nameTask = getName(id);  //对每个id异步获取name
        Mono<String> addrTask = getAddress(id);  //对每个id异步获取addr
        return nameTask.zipWith(addrTask,  //组合结果并返回
                                (name, addr) -> "Name " + name + " has addr " + addr);
                });

2.2 反压/流量控制

2.2.1 推-拉模型

上面说到响应式编程的实现手段就是事件发布,发送数据的叫生产者,接受数据的叫订阅者,那么关于数据的传递获取方式就有两种模型,push模型pull模型

纯拉模型

每次消费者需要元素时,主动向生产者发起请求,生产者被动发送数据

优点:消费者按需消费,不会造成内存溢出

缺点:多个拉请求是同步执行的,网络开销大,性能低

纯推模型

生产者一旦有元素到达则立刻发送给消费者

优点:连接开销少,生产者不断有可用的数据,性能高

缺点:当生产者速度高于消费者时,消费者会被“压垮”

推-拉结合模型

消费者可以异步发送请求获取若干个元素

优点:消费者按需消费,同时在消费过程中也能够发起资源拉取的申请

2.2.2 反压/流量控制

上述这种能够向上游反馈流量请求的机制就叫做反压机制,它能够让下游根据自身能力去处理数据而不会被压垮。

反压策略

  • 消费者控制,需要时再生产
  • 生产者缓存
  • 生产者丢弃

2.3 小总结

以上就是响应式流的两个核心特点:异步非阻塞,以及基于“反压”机制的流量控制。

这样我们有了基于响应式流的“升级版”的响应式编程:

3. Spring响应式处理模型

3.1 对比 WebFlux 和 WebMVC 的处理模型

WebMVC处理模型(同一个线程)

所有请求都排队并由一个 Thread 按顺序进行处理。黑条表示此处存在迸出I/O 的阻塞读/写操作 。 此外,如你所见,实际处理时间(白条 ) 远少于花费在阻塞操作上 的时间 。

WebFlux处理模型(同一个线程)

将图中的异步、非阻塞请求处理与阻塞示例进行比较,可以发现,系统没有在收集请求消息体时发生等待。Thread 被高效的使用以接受新连接。也就是,当某个请求因读写等待时,线程不会阻塞而是会切换到处理其它请求。当某个请求读写完成时,线程会切换回来处理该请求。

所以,如果要问服务端最大的性能瓶颈是什么,那答案一定是IO,因为处理一个请求的过程中最耗时的部分就是等待IO,而等待就会造成阻塞,所以如果要提升性能,就不能写出阻塞的代码来。而响应式编程就是做这样的事。

4. 总结

响应式编程的本质,就是一个异步编程的轮子,用观察者模式的API以及流式编程把异步编程的过程变得更加清晰和简单,就像MVC封装了Servlet,底层其实还是对已有技术的封装

随着各个web系统的并发量增加,毫无疑问响应式编程式未来的趋势之一,在 Spring 家族的 Spring Webflux、Spring Cloud  Gateway 等诸多框架中都得到了广泛应用。

其它内容

  • Java响应式编程的核心接口(Publisher、Subscriber、Subscription 和 Processor<T,R>)
  • 响应式编程宣言
  • 响应式Web(Spring5中的flux和Mono)
  • 响应式Spring Security
  • 响应式Spring Data
  • .........
0条评论
0 / 1000
c****m
2文章数
0粉丝数
c****m
2 文章 | 0 粉丝
c****m
2文章数
0粉丝数
c****m
2 文章 | 0 粉丝
原创

Java响应式编程介绍-以Spring WebFlux为例

2023-08-25 06:19:09
35
0

Java响应式编程介绍

1. 响应式编程概念

1.1 什么是响应式编程

在介绍讨论响应式编程之前,先来看一个我们经常使用的一款生产力工具—excel。假设某电商网站正在搞促销活动,任何单品都可以参加“满199减40”的活动

上图中蓝色为引用关系,即当单价或数量发生变化时,其它单元格也会动态改变。上面就是一个典型的响应式例子。

总的来说响应式编程具有3个特点:

  • 变化传递
  • 数据流形式
  • 声明式范式

1.2 变化传递

概念变化传递意思是当某个对象或变量发生改变时,所有引用或使用该变量的其它对象组件也会实时变化。

下面举个🌰

假设购物车管理和付款金额计算是两个不同的模块,或者至少是两个不同的类——CartPay。我们希望购物车发生变化时,金额也能动态变化。

方案一

public class Cart {
    private Pay pay; // 1.Pay组件
    public boolean addProduct(Product product, int quantity) {
        double price = product.getPrice() * quantity; // 2. 金额变化
        pay.update(price); // 3.调用pay更新金额
    }
}

缺点

  1. Cart对象需要知道Pay对象的存在,不符合迪米特原则

方案二

public class Pay {
    public Pay(Cart cart) {
        this.listenOn(cart);    // 1. 监听购物车对象
    }
    // 2. 当购物车发生变化时的回调方法
    public void onCartChange(CartEvent event) {
        update(event) // 3. 更新变化
    }
}

上面的代码我们可以想到回调或者观察者模式,本质上响应式编程的实现手段就是事件发布/回调

那么,这些变化事件什么时候发出,以及发出形式是什么呢?

1.3 数据流形式

这些数据/事件在响应式编程里会以数据流的形式发出。例如每次往购物车里添加或移除一种商品,或调整商品的购买数量都会发布一个事件。这些操作事件连起来就是一个数据流。这是响应式的另一个核心特点:基于数据流(data stream)

public Pay(Cart cart) {
        ...
        this.listenOn(cart.eventStream());  // 1. 监听流
        ...
    }

至于为什么要用流的形式?流编程的好处是什么?

1.4 声明式范式

public void listenOn(DataStream<CartEvent> cartEventStream) {  
    double sum = 0;
    double total = cartEventStream
        // 分别计算商品金额
        .map(cartEvent -> cartEvent.getProduct().getPrice() * cartEvent.getQuantity())
        // 计算满减后的商品金额
        .map(v -> (v > 199) ? (v - 40) : v)
        // 将金额的变化累加到sum
        .map(v -> {sum += v; return sum;})
        // 根据sum判断是否免邮,得到最终总付款金额
        .map(sum -> (sum > 500) ? sum : (sum + 50));
        ...

上述就是一种声明式(declarative)的编程范式。通过四个串起来的map调用,我们先声明好了对于数据流“将会”进行什么样的处理,当有数据流过来时,就会按照声明好的处理流程逐个进行处理。

声明式编程范式的威力在于以不变应万变。无论到来的元素是什么,计算逻辑是不变的,从而让数据和数据处理逻辑隔离开。比如处理逻辑是榨果汁,那么如果上游是苹果,出来就是苹果汁。上游是西瓜,出来就是西瓜汁。

1.5 小总结

总结起来,响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。

响应式编程就相当于工业处理管道;入口处是变化事件,管道里面都是流动的“数据流”;管道的图纸是用“声明式”的语言表示的。

2. 响应式流的特点

上面了解到响应式编程的一个关键是数据流形式。那我们常用的Java Stream算不算响应式流呢?总的来说,响应式流的特点是和响应式编程的应用场景有较大的关联性:

  • 异步非阻塞
  • 反压/流量控制

2.1 异步非阻塞

在如今互联网时代的大背景下,Web应用通常要面对高并发、多服务调用的挑战,性能从来都是必须要考量的核心因素。一般来说,同步阻塞就是性能杀手之一

复习一下:

阻塞和非阻塞反映的是调用者状态

  • 阻塞:调用者调用服务提供的方法后,需要一直等待结果而无法进行后续操作
  • 非阻塞:调用者调用服务提供的方法后能立刻返回

同步和异步反映的是服务者能力

  • 同步:服务提供者需要方法处理完才能返回,或者需要调用方主动查询
  • 异步:服务提供者能够立马返回,并在处理完成后通过某种方式通知到调用者

2.1.1 阻塞情景及解决方法

情景:订单服务需要调用用户服务的某些方法

传统方式

interface UserService{
	Output getName(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		Output name = userService.getName(input); //阻塞,等待完成
		....
	}
}

缺点:性能低

回调

interface UserService{
	void getName(Input value, Function callback); //没有返回值,可立刻返回
}

class UserServiceImpl implement UserService {
    public void getName(Input value, Function callback) {
        //这里既可以同步,也可以异步返回
        Output name = new Output();
        callback(name); //调用回调函数,把结果返回
    }
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
        Output result;
		userService.getName(input, output -> {
            ....
            result = output 	//回调函数中获取结果
        });  
		.... //没有阻塞,可继续往下执行
	}
}

缺点:回调地狱

多线程Future

interface UserService{
	Future<Output> getName(Input value);
    Future<Output> getAddress(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		Future<Output> name = userService.getName(input); //直接返回
        Future<Output> address = userService.getAddress(input) //直接返回
        name.get(); //阻塞获取
        .... //其它name的加工操作
        address.get(); //阻塞获取
		.... //其它address的加工操作
	}
}

缺点:多个Future的异步获取又变成同步了,无法立刻处理先返回的的结果

Java8 CompletableFuture接口

interface UserService{
	CompletableFuture<Output> getName(Input value);
    CompletableFuture<Output> getAddress(Input value);
}

class OrderService{
	private UserService userService;
	
	void process() {
		Input input = ...;
		userService.getName(input)
            .thenApply(out1 -> {...}) //其它加工操作
            .thenAcceput(out3 -> {...}) //获得最终结果
            
        userService.getAddress(input)
            .thenApply(out1 -> {...}) //其它加工操作
            .thenAcceput(out3 -> {...}) //获得最终结果
	}
}

缺点:completionFuture一旦异步任务较多时可编排性 以及 可读性太差

举个🌰

需求:我们首先得到 id 的列表,然后对每一个id进一步获取到id对应的name和address这样一对属性的组合

completionFuture形式

//获取所有的ids
//对每一个id都要生成一个组合completionFuture任务 (异步获取name,异步获取addr)
//将上述所有id生成的组合任务再组合起来成一个任务
//等待所有异步任务完成

CompletableFuture<List<String>> ids = getids(); 
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> { 
                CompletableFuture<String> nameTask = getName(i); 
                CompletableFuture<String> addrTask = getAddress(i); 

                return nameTask.thenCombineAsync(addrTask, (name, addr) -> "Name " + name + " has addr " + addr); 
            }); 
    
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
    return allDone.thenApply(v -> combinationList.stream(
            .map(CompletableFuture::join) 
            .collect(Collectors.toList()));
});

响应式流形式(天然的异步非阻塞)—— 类似Stream的编程方式

// Flux修饰代表这是一个响应式流
Flux<String> ids = getIds(); 

Flux<String> combinations =
    ids.flatMap(id -> {  //id作为一个响应式流
        Mono<String> nameTask = getName(id);  //对每个id异步获取name
        Mono<String> addrTask = getAddress(id);  //对每个id异步获取addr
        return nameTask.zipWith(addrTask,  //组合结果并返回
                                (name, addr) -> "Name " + name + " has addr " + addr);
                });

2.2 反压/流量控制

2.2.1 推-拉模型

上面说到响应式编程的实现手段就是事件发布,发送数据的叫生产者,接受数据的叫订阅者,那么关于数据的传递获取方式就有两种模型,push模型pull模型

纯拉模型

每次消费者需要元素时,主动向生产者发起请求,生产者被动发送数据

优点:消费者按需消费,不会造成内存溢出

缺点:多个拉请求是同步执行的,网络开销大,性能低

纯推模型

生产者一旦有元素到达则立刻发送给消费者

优点:连接开销少,生产者不断有可用的数据,性能高

缺点:当生产者速度高于消费者时,消费者会被“压垮”

推-拉结合模型

消费者可以异步发送请求获取若干个元素

优点:消费者按需消费,同时在消费过程中也能够发起资源拉取的申请

2.2.2 反压/流量控制

上述这种能够向上游反馈流量请求的机制就叫做反压机制,它能够让下游根据自身能力去处理数据而不会被压垮。

反压策略

  • 消费者控制,需要时再生产
  • 生产者缓存
  • 生产者丢弃

2.3 小总结

以上就是响应式流的两个核心特点:异步非阻塞,以及基于“反压”机制的流量控制。

这样我们有了基于响应式流的“升级版”的响应式编程:

3. Spring响应式处理模型

3.1 对比 WebFlux 和 WebMVC 的处理模型

WebMVC处理模型(同一个线程)

所有请求都排队并由一个 Thread 按顺序进行处理。黑条表示此处存在迸出I/O 的阻塞读/写操作 。 此外,如你所见,实际处理时间(白条 ) 远少于花费在阻塞操作上 的时间 。

WebFlux处理模型(同一个线程)

将图中的异步、非阻塞请求处理与阻塞示例进行比较,可以发现,系统没有在收集请求消息体时发生等待。Thread 被高效的使用以接受新连接。也就是,当某个请求因读写等待时,线程不会阻塞而是会切换到处理其它请求。当某个请求读写完成时,线程会切换回来处理该请求。

所以,如果要问服务端最大的性能瓶颈是什么,那答案一定是IO,因为处理一个请求的过程中最耗时的部分就是等待IO,而等待就会造成阻塞,所以如果要提升性能,就不能写出阻塞的代码来。而响应式编程就是做这样的事。

4. 总结

响应式编程的本质,就是一个异步编程的轮子,用观察者模式的API以及流式编程把异步编程的过程变得更加清晰和简单,就像MVC封装了Servlet,底层其实还是对已有技术的封装

随着各个web系统的并发量增加,毫无疑问响应式编程式未来的趋势之一,在 Spring 家族的 Spring Webflux、Spring Cloud  Gateway 等诸多框架中都得到了广泛应用。

其它内容

  • Java响应式编程的核心接口(Publisher、Subscriber、Subscription 和 Processor<T,R>)
  • 响应式编程宣言
  • 响应式Web(Spring5中的flux和Mono)
  • 响应式Spring Security
  • 响应式Spring Data
  • .........
文章来自个人专栏
软件开发与测试
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0