RxJava2 只看这一篇文章就够了

墨蓝 2021-06-10 20:38 663阅读 0赞

0. 简介

RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。

RxJava 有以下三个基本的元素:

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

下面来说说以上三者是如何协作的:

首先在 gradle 文件中添加依赖:

  1. implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
  2. implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

1、创建被观察者:

  1. Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  4. Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
  5. e.onNext(1);
  6. e.onNext(2);
  7. e.onNext(3);
  8. e.onComplete();
  9. }
  10. });

2、创建观察者:

  1. Observer observer = new Observer<Integer>() {
  2. @Override
  3. public void onSubscribe(Disposable d) {
  4. Log.d(TAG, "======================onSubscribe");
  5. }
  6. @Override
  7. public void onNext(Integer integer) {
  8. Log.d(TAG, "======================onNext " + integer);
  9. }
  10. @Override
  11. public void onError(Throwable e) {
  12. Log.d(TAG, "======================onError");
  13. }
  14. @Override
  15. public void onComplete() {
  16. Log.d(TAG, "======================onComplete");
  17. }
  18. };

3、订阅:

  1. observable.subscribe(observer);

这里其实也可以使用链式调用:

  1. Observable.create(new ObservableOnSubscribe < Integer > () {
  2. @Override
  3. public void subscribe(ObservableEmitter < Integer > e) throws Exception {
  4. Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
  5. e.onNext(1);
  6. e.onNext(2);
  7. e.onNext(3);
  8. e.onComplete();
  9. }
  10. })
  11. .subscribe(new Observer < Integer > () {
  12. @Override
  13. public void onSubscribe(Disposable d) {
  14. Log.d(TAG, "======================onSubscribe");
  15. }
  16. @Override
  17. public void onNext(Integer integer) {
  18. Log.d(TAG, "======================onNext " + integer);
  19. }
  20. @Override
  21. public void onError(Throwable e) {
  22. Log.d(TAG, "======================onError");
  23. }
  24. @Override
  25. public void onComplete() {
  26. Log.d(TAG, "======================onComplete");
  27. }
  28. });

被观察者发送的事件有以下几种,总结如下表:






















事件种类 作用
onNext() 发送该事件时,观察者会回调 onNext() 方法
onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

其实可以把 RxJava 比喻成一个做果汁,家里有很多种水果(要发送的原始数据),你想榨点水果汁喝一下,这时候你就要想究竟要喝什么水果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。

总结如下图:

format_png

下面就来讲解 RxJava 各种常见的操作符。

1. 创建操作符

以下就是讲解创建被观察者的各种操作符。

1.1 create()

方法预览:

  1. public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

有什么用:

创建一个被观察者

怎么用:

  1. Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<String> e) throws Exception {
  4. e.onNext("Hello Observer");
  5. e.onComplete();
  6. }
  7. });

上面的代码非常简单,创建 ObservableOnSubscribe 并重写其 subscribe 方法,就可以通过 ObservableEmitter 发射器向观察者发送事件。

以下创建一个观察者,来验证这个被观察者是否成功创建。

  1. Observer<String> observer = new Observer<String>() {
  2. @Override
  3. public void onSubscribe(Disposable d) {
  4. }
  5. @Override
  6. public void onNext(String s) {
  7. Log.d("chan","=============onNext " + s);
  8. }
  9. @Override
  10. public void onError(Throwable e) {
  11. }
  12. @Override
  13. public void onComplete() {
  14. Log.d("chan","=============onComplete ");
  15. }
  16. };
  17. observable.subscribe(observer);

打印结果:

  1. 05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
  2. =============onComplete

1.2 just()

方法预览:

  1. public static <T> Observable<T> just(T item)
  2. ......
  3. public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

有什么用?

创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

怎么用?

  1. Observable.just(1, 2, 3)
  2. .subscribe(new Observer < Integer > () {
  3. @Override
  4. public void onSubscribe(Disposable d) {
  5. Log.d(TAG, "=================onSubscribe");
  6. }
  7. @Override
  8. public void onNext(Integer integer) {
  9. Log.d(TAG, "=================onNext " + integer);
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. Log.d(TAG, "=================onError ");
  14. }
  15. @Override
  16. public void onComplete() {
  17. Log.d(TAG, "=================onComplete ");
  18. }
  19. });

上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果:

  1. 05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
  2. =================onNext 1
  3. =================onNext 2
  4. =================onNext 3
  5. =================onComplete

1.3 From 操作符

1.3.1 fromArray()

方法预览:

  1. public static <T> Observable<T> fromArray(T... items)

有什么用?

这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

怎么用?

  1. Integer array[] = {1, 2, 3, 4};
  2. Observable.fromArray(array)
  3. .subscribe(new Observer < Integer > () {
  4. @Override
  5. public void onSubscribe(Disposable d) {
  6. Log.d(TAG, "=================onSubscribe");
  7. }
  8. @Override
  9. public void onNext(Integer integer) {
  10. Log.d(TAG, "=================onNext " + integer);
  11. }
  12. @Override
  13. public void onError(Throwable e) {
  14. Log.d(TAG, "=================onError ");
  15. }
  16. @Override
  17. public void onComplete() {
  18. Log.d(TAG, "=================onComplete ");
  19. }
  20. });

代码和 just() 基本上一样,直接看打印结果:

  1. 05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
  2. =================onNext 1
  3. =================onNext 2
  4. =================onNext 3
  5. =================onNext 4
  6. =================onComplete

1.3.2 fromCallable()

方法预览:

  1. public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

有什么用?

这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

怎么用?

  1. Observable.fromCallable(new Callable < Integer > () {
  2. @Override
  3. public Integer call() throws Exception {
  4. return 1;
  5. }
  6. })
  7. .subscribe(new Consumer < Integer > () {
  8. @Override
  9. public void accept(Integer integer) throws Exception {
  10. Log.d(TAG, "================accept " + integer);
  11. }
  12. });

打印结果:

  1. 05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1

1.3.3 fromFuture()

方法预览:

  1. public static <T> Observable<T> fromFuture(Future<? extends T> future)

有什么用?

参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

怎么用?

  1. FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
  2. @Override
  3. public String call() throws Exception {
  4. Log.d(TAG, "CallableDemo is Running");
  5. return "返回结果";
  6. }
  7. });
  8. Observable.fromFuture(futureTask)
  9. .doOnSubscribe(new Consumer < Disposable > () {
  10. @Override
  11. public void accept(Disposable disposable) throws Exception {
  12. futureTask.run();
  13. }
  14. })
  15. .subscribe(new Consumer < String > () {
  16. @Override
  17. public void accept(String s) throws Exception {
  18. Log.d(TAG, "================accept " + s);
  19. }
  20. });

doOnSubscribe() 的作用就是只有订阅时才会发送事件,具体会在下面讲解。

打印结果:

  1. 05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
  2. ================accept 返回结果

1.3.4 fromIterable()

方法预览:

  1. public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

有什么用?

直接发送一个 List 集合数据给观察者

怎么用?

  1. List<Integer> list = new ArrayList<>();
  2. list.add(0);
  3. list.add(1);
  4. list.add(2);
  5. list.add(3);
  6. Observable.fromIterable(list)
  7. .subscribe(new Observer < Integer > () {
  8. @Override
  9. public void onSubscribe(Disposable d) {
  10. Log.d(TAG, "=================onSubscribe");
  11. }
  12. @Override
  13. public void onNext(Integer integer) {
  14. Log.d(TAG, "=================onNext " + integer);
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. Log.d(TAG, "=================onError ");
  19. }
  20. @Override
  21. public void onComplete() {
  22. Log.d(TAG, "=================onComplete ");
  23. }
  24. });

打印结果如下:

  1. 05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
  2. =================onNext 0
  3. =================onNext 1
  4. =================onNext 2
  5. =================onNext 3
  6. =================onComplete

1.4 defer()

方法预览:

  1. public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

有什么用?

这个方法的作用就是直到被观察者被订阅后才会创建被观察者。

怎么用?

  1. // i 要定义为成员变量
  2. Integer i = 100;
  3. Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
  4. @Override
  5. public ObservableSource<? extends Integer> call() throws Exception {
  6. return Observable.just(i);
  7. }
  8. });
  9. i = 200;
  10. Observer observer = new Observer<Integer>() {
  11. @Override
  12. public void onSubscribe(Disposable d) {
  13. }
  14. @Override
  15. public void onNext(Integer integer) {
  16. Log.d(TAG, "================onNext " + integer);
  17. }
  18. @Override
  19. public void onError(Throwable e) {
  20. }
  21. @Override
  22. public void onComplete() {
  23. }
  24. };
  25. observable.subscribe(observer);
  26. i = 300;
  27. observable.subscribe(observer);

打印结果如下:

  1. 05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
  2. ================onNext 300

因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。

1.5 timer()

方法预览:

  1. public static Observable<Long> timer(long delay, TimeUnit unit)
  2. ......

有什么用?

当到指定时间后就会发送一个 0L 的值给观察者。

怎么用?

  1. Observable.timer(2, TimeUnit.SECONDS)
  2. .subscribe(new Observer < Long > () {
  3. @Override
  4. public void onSubscribe(Disposable d) {
  5. }
  6. @Override
  7. public void onNext(Long aLong) {
  8. Log.d(TAG, "===============onNext " + aLong);
  9. }
  10. @Override
  11. public void onError(Throwable e) {
  12. }
  13. @Override
  14. public void onComplete() {
  15. }
  16. });

打印结果:

  1. 05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0

1.6 interval()

方法预览:

  1. public static Observable<Long> interval(long period, TimeUnit unit)
  2. public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
  3. ......

有什么用?

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

怎么用?

  1. Observable.interval(4, TimeUnit.SECONDS)
  2. .subscribe(new Observer < Long > () {
  3. @Override
  4. public void onSubscribe(Disposable d) {
  5. Log.d(TAG, "==============onSubscribe ");
  6. }
  7. @Override
  8. public void onNext(Long aLong) {
  9. Log.d(TAG, "==============onNext " + aLong);
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. }
  14. @Override
  15. public void onComplete() {
  16. }
  17. });

打印结果:

  1. 05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe
  2. 05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
  3. 05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
  4. 05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
  5. 05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
  6. 05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
  7. 05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5

从时间就可以看出每隔4秒就会发出一次数字递增1的事件。这里说下 interval() 第三个方法的 initialDelay 参数,这个参数的意思就是 onSubscribe 回调之后,再次回调 onNext 的间隔时间。

https://juejin.im/post/5b17560e6fb9a01e2862246f#heading-31

发表评论

表情:
评论列表 (有 0 条评论,663人围观)

还没有评论,来说两句吧...

相关阅读