RxJava的标准Java平台实现:Flow响应式编程
从Java 9开始,Java平台做了大幅度的框架性优化和调整,比如过去Java中的观察者模式标准实现java.util. Observable和java.util. Observer已被标注为废弃,建议开发者使用Java 9的Flow实现观察者模式以及RxJava那一套响应式编程。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* 发送者。
*/
public class Sender implements Flow.Publisher {
//线程池大小。
private int THREAD_POOL_SIZE = 5;
private SubmissionPublisher<String> publisher;
public Sender() {
ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
publisher = new SubmissionPublisher<>(EXECUTOR, Flow.defaultBufferSize());
}
@Override
public void subscribe(Flow.Subscriber subscriber) {
getPublisher().subscribe(subscriber);
}
public SubmissionPublisher<String> getPublisher() {
return publisher;
}
public void send(String s) {
getPublisher().submit(s);
}
}
import java.util.concurrent.Flow;
/**
* 接受者。
*/
public class Receiver implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
private int REQ;
public Receiver(int req) {
this.REQ = req;
}
@Override
public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
this.subscription.request(REQ);
}
@Override
public void onNext(String item) {
subscription.request(REQ);
System.out.println(REQ + " onNext " + item);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(REQ + " onComplete");
}
}
测试:
Sender sender = new Sender();
Receiver receiver1 = new Receiver(1);
sender.subscribe(receiver1);
Receiver receiver2 = new Receiver(2);
sender.subscribe(receiver2);
for (int i = 0; i < 10; i++) {
sender.send(i + "");
}
输出:
1 onNext 0
2 onNext 0
2 onNext 1
1 onNext 1
2 onNext 2
2 onNext 3
2 onNext 4
2 onNext 5
1 onNext 2
2 onNext 6
1 onNext 3
2 onNext 7
1 onNext 4
2 onNext 8
1 onNext 5
2 onNext 9
1 onNext 6
1 onNext 7
1 onNext 8
1 onNext 9