RxJava简单源码的示例分析
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。demo代码如下public class Ob
千家信息网最后更新 2025年12月02日RxJava简单源码的示例分析
今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
demo代码如下
public class ObservableTest { public static void main(String[] args) { Observable先看第一行代码
Observableobservable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); }});//Observable.java//第1560行public static Observable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); //RxJavaPlugins里有很多方法可以设置, //有点类似于Spring的ApplicationListener,在对应的生命周期中会被调用 return RxJavaPlugins.onAssembly(new ObservableCreate (source));}//RxJavaPlugins.java//第1031行public static Observable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; //如果设置了对应的方法,就执行,否则原样返回 if (f != null) { return apply(f, source); } return source;}
可以看到RxJavaPlugins中的方法如果不配置的方法,参数就会原样返回,所以Observable.create最终得到的就是ObservableCreate这个类。
再来看第二行代码
observable.subscribe(new Consumer() { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); }});//Observable.java//第10869行public final Disposable subscribe(Consumer super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//Observable.java//第10958行public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); //这里的onNext就是我们自己写的Consumer类 LambdaObserver ls = new LambdaObserver (onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls;}//Observable.java//第10974行public final void subscribe(Observer super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //还记得我们的observable变量是什么类型么?ObservableCreate! subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}//ObservableCreate.java//第35行protected void subscribeActual(Observer super T> observer) { //这里的observer是LambdaObserver CreateEmitter parent = new CreateEmitter (observer); observer.onSubscribe(parent); //省略部分代码}//LambdaObserver.java//第47行public void onSubscribe(Disposable s) { //设置AtomicReference的值(LambdaObserver继承了AtomicReference) //如果之前已经设置过了(AtomicReference的值不为空),则直接返回false if (DisposableHelper.setOnce(this, s)) { try { //在new LambdaObserver()的时候我们设置了onSubscribe = Functions.emptyConsumer() //所以这里什么都不做 onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); onError(ex); } }}//ObservableCreate.java//第35行protected void subscribeActual(Observer super T> observer) { //省略部分代码 try { //还记得source是啥么,就是你在创建Observable的时候new的ObservableOnSubscribe //于是终于执行到了我们编写的代码中 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}//ObservableOnSubscribe.java//第6行public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { //开始执行这个方法 //observer是new CreateEmitter (new LambdaObserver()); @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); } });}//ObservableCreate$CreateEmitter//第61行public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //这里的observer就是LambdaObserver //t就是《"处理的数字是:" + Math.random() * 100》这段字符串 observer.onNext(t); }}//LambdaObserver.java//第60行public void onNext(T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } }}//ObservableOnSubscribe.java//第13行public static void main(String[] args) { //省略部分代码 observable.subscribe(new Consumer () { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); } });}//ObservableOnSubscribe.java//第8行public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { //省略部分代码 observer.onComplete(); } }); //省略部分代码}//ObservableCreate.java//第95行public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { //取消订阅 dispose(); } }}//LambdaObserver.java//第86行public void onComplete() { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { //new LambdaObserver的时候设置了为空,所以不执行操作 onComplete.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } }}
至此,调用流程分析完成,可以看到虽然在main方法里我们只写了几行代码,但是内部调用的流程还是很繁杂的
看完上述内容,你们对RxJava简单源码的示例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
代码
处理
方法
就是
部分
数字
分析
元素
内容
时候
源码
示例
原样
流程
繁杂
一行
参数
变量
周期
字符
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库中变长指针法
海康 linux 服务器检测
int在数据库中是什么数据类型
奥迪服务器连不上网
远程管理ark服务器
数据库性别约束
浙江浙京网络技术
用电脑做服务器安全吗
软件测试数据库连接
linux 远程数据库
网络安全法解读图文
网络代理服务器连接失败怎么解决
mac 登陆远程服务器
重庆 网络安全测评 公司
孔子云服务器
大学网络安全管理办法
在哪查网贷数据库
宝山区网络技术咨询优化
华中科大网络安全专业就业前景
计算机网络安全的安全措施
服务器出现了安全问题js
王者荣耀服务器维护s14
深圳软件开发夏前锋
登陆linux服务器
网络安全护网行动案例
怎么快速替换数据库
为什么数据库都是mysql
鄞州一站式软件开发工具
电商带货软件开发报价
奶瓶下载软件开发