太原Java培训
达内太原java培训中心

0351-5608878

热门课程

RX-Java中,life操作内部实现的优雅之处

  • 时间:2016-11-24
  • 发布:木之阳兮 卿云烂兮
  • 来源:木之阳兮 卿云烂兮

RX-Java中,life操作内部实现的优雅之处

RX-* 系列的库是一款开源的并发流程控制的框架,有多种语言的实现[1]。用户可以通过它使用流式的编程风格,写出高可读性的并发流程控制代码。以下是针对RX-Java中,observable的各种变换(mapflatmap)的内部实现的分析。

如果使用过RX-Java,我们知道,map可以使一种类型的可订阅者被另一种类型的订阅者订阅:

Observable 可被 Subscriber 订阅

Observable –> map :可被 Subscriber 订阅

不论是map、还是flatmap,在底层都是通过life实现。那么通过life转换后的的observable如何将原来的类型参数 发射到新的subscriber(observable –> observable):

1: 调用operator call方法( R为目标类型, T为原始类型 )

(1)call中传入新的Subscriber ( 将来新的subscriber订阅observable 时会自动传入 ) ,

(2)实例化旧的的Subscriber ( 和原本旧的observer 绑定 ),通过旧的Subscriber 中的onNextonCompleted,将T类型的数据处理并转为R类型,发送通知给subscriber

2:

实例化一个新的observable ,绑定新的OnSubscribe

通过 newOnSubscribe .call()调用oldOnSubscribe .call()

oldOnSubscribe 通知 subscriber ,subscriber onNext onError onComplate中调用subscriber

通过代码来理解:

定义一个Integer observable

Observable observable = Observable.create(newObservable.OnSubscribe() {

@Override

publicvoidcall(Subscriber subscriber){

System.out.println("Observable:"+ Thread.currentThread().getId());

try{

for(inti =0; i <5; i++)

subscriber.onNext(i);

subscriber.onCompleted();

}catch(Exception e){

subscriber.onError(e);

}

}

});

通过lifeIntegerobservable转为Stringobservable

注意看其中subsubscriber 是如何通知Subscriber

observable = observable.lift(newObservable.Operator() {

@Override

publicSubscriber call(Subscriber subscriber) {

returnnewSubscriber() {

@Override

publicvoidonCompleted(){

System.out.println("Integer Subscriber Completed --|");

subscriber.onCompleted();

}

@Override

publicvoidonError(Throwable throwable){

System.out.println("Integer Subscriber: Error-->");

}

@Override

publicvoidonNext(Integer integer){

System.out.println(

String.format("Integer Subscriber --> Next:%s, Thread:%s -->",

integer, Thread.currentThread().getId()));

subscriber.onNext(integer + "");

}

};

}

}

);

注册Subscriber 并订阅observable

observable.observeOn(Schedulers.newThread())

.subscribe(newSubscriber() {

@Override

publicvoidonCompleted(){

System.out.println("String Subscriber Complete");

}

@Override

publicvoidonError(Throwable throwable){

System.out.println(throwable);

}

@Override

publicvoidonNext(String str){

System.out.println("String Subscriber Next:"+ str +",Thread:"+ Thread.currentThread().getId());

System.out.println("TR2:"+ Thread.currentThread().getId());

}

});

我们可以看到:

RX-Javalife的实现中,将observable 转为observable ,并没有去考虑将observable 中的call方法中发出的事件克隆过来,而是直接将observable 的创建与observable OnSubscribe相关联,直接通过observable 产生事件通知observable

subscriber observable 的订阅事件(call)注册过程中,也很巧妙,通过observable 产生事件,通知observable --> 然后observable 通知subscriber 执行事件 --> subscriber 中再通知subscriber

执行订阅事件

这一整套过程,就优雅地使用了设计模式中的 适配器模式 代理 模式

好了,今天就给大家讲这么多吧,喜欢我的内容可以关注或者分享(微信公众平台:tytedu)选择太原达内培训,不再孤军奋战,轻轻松松做IT高薪白领。太原达内培训带领有明确目标的学子迈向成功之路!

上一篇:Java 多线程编程(三)
下一篇:从理解volatile的内存语义实现到Java的锁实现的思考

Apache Lucene 6.4.2 发布,Java 搜索引擎

太原Java培训教你提高自己的编程速度

太原java培训编程如人生

Java编程基本概念

选择城市和中心
贵州省

广西省

海南省