本文仅适用于Java用户,Kotlin用户请跳过本文
RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使你可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I/O。
发布/订阅模式是观察者模式的一种特殊的实现,从解耦和重用角度来看更优于典型的观察者模式。 在发布/订阅模式中,发布者和订阅者之间多了一个发布通道;一方面从发布者接收事件,另一方面向订阅者发布事件;订阅者需要从事件通道订阅事件以此避免发布者和订阅者之间产生直接的依赖关系。
观察者和被观察者是松耦合的关系,发布者和订阅者是不耦合关系,观察者模式,用于单个应用内部,订阅者模式用于跨应用模式,常见于消息中间件。
先看使用RxJava的代码:
可见这是典型的发布/订阅模式
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("淡月疏星绕建章");
emitter.onNext("仙风吹下御炉香");
emitter.onNext("侍臣鹄立通明殿");
emitter.onNext("一朵红云捧玉皇");
emitter.onComplete();
}
});
/**
* 在实际使用中,初始化Observable的代码块是写在ViewModel里的,
* 而初始化各个Observer的代码块是写在Activity和各个Fragment里的
* 初始化observer1和observer2的代码与observer0的雷同,省略
*/
Observer<String> observer0 = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
Log.e("observer0 onNext",""+s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.e("observer0 onComplete","");
}
};
observable.subscribe(observer0);
observable.subscribe(observer1);
observable.subscribe(observer2);
其中observable对象扮演的是channel的角色,真正的被观察者是ObservableEmitter的实例emitter,而observer0、observer1和observer2则是三个地位平等的观察者。