RxJava基本学习

RxJava

RxJava初理解

  1. 定义

    RxJava的核心思想就是异步,其在Gitub的主页的自我介绍就是” a library for composing asynchronous and event-based programs using observable sequences for the Java VM”,翻译过来就是一个在Java VM上使用可观测序列来组成异步的、基于事件的程序的库。

  2. 本质

    一个实现异步操作的库,它可以使程序的具有很好的简洁性。这里的简洁性指的是逻辑上的简洁,而不是代码量的减少,它的强大之处在于当程序随着逻辑变得很复杂的时候。它依然能够保持它的简洁性

  3. 核心

    RxJava的核心是Obserables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件,这里指的事件可以是任何东西,比如触摸事件,网络请求的回调接口等

    一个Observable可以发出零个或者多个事件,并且知道结束或者出错。每发出一个事件,就会调用它的Subsceribe的onext()方法,最后调用subscriber.onNext()或者subscriber.onError()结束

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 val observable = Observable.create(ObservableOnSubscribe<Int>{
it.apply {
onNext(1)
onNext(2)
onNext(3)
onComplete()
}
})
//这是完整的写法,上面的是用的Lambda表达式
// val observable = Observable.create(object :ObservableOnSubscribe<Int>{
// override fun subscribe(emitter: ObservableEmitter<Int>?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
// }
//
// })

val observer = object: Observer<Int>{
override fun onComplete() {
Log.d("************","complete")
}

override fun onSubscribe(d: Disposable?) {
Log.d("************","subsscribe")
}

override fun onNext(t: Int) {
Log.d("************",""+t)

}

override fun onError(e: Throwable?) {
Log.d("************","error")
}

}

observable.subscribe(observer)
  1. 这其中的Emitter是发射器的意思,它是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)onComplete()onError(Throwable error)来分别发出next、complete、error事件

  2. 发送事件要注意以下几点

    • 上游可以发送无限个onNext事件,下游可以接受无限个onNext事件

    • 当上游发送了一个onComplete后,上游onComplete之后的事件将会 继续发送,但是下游在接受到onComplete事件以后将不再继续接受事件

    • 当上游发送了一个onError后,情况与onComplete相同(本人目前未验证)

    • 上游可以不发送onComplete或者onError

    • 重要的是onComplete和Error必须唯一且互斥

RxJava强大的线程调度

  • 在正常情况中,一个事件的流向从上游到下游是在同一个线程中工作的,RxJava的强大之处就在于线程控制

    就Android来说,一个Activity所有的操作默认都是在主线程中执行的。所以如果我们不指定上下游所在的线程,那么他们默认的执行就是在主线程。

  • RxJava的最大的亮点就是它的线程控制。在实际工作中我们长长需要在仔细暗沉中做耗时操作,在主线程来操作UI(图片中黄色蓝色表示不同线程)

    通过一下代码就可以了做到线程的调度

    1
    2
    3
    observable.subscribeOn(Schedulers.newThread())                                              
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(consumer);

    subscribeOn()指定的是上游发送事件的线程,observeOn()指定的是下游接受事件的线程。多次指定上游的线程只有第一次有效,即subcribeOn()只有第一次指定的有效。但是下游的线程确是可以指定多次的,也就是说每调用一次observeOn(),下游的线程就会切换一次

    以下是一些内置好的线程(在开发中我们应该尽量使用这些内置的Scheduler,因为RxJava内部使用的是线程池来维护这些线程,所以效率也比较高)

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件、数据库等io密集型的操作
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    • Schedulers.newThread() 代表一个常规的新线程
    • AndroidSchedulers.mainThread() 代表Android的主线程
  • Rxjava配合Retrofit进行网络请求

    • 定义Api接口

      1
      2
      3
      4
      5
      6
      7
      public interface Api {
      @GET
      Observable<LoginResponse> login(@Body LoginRequest request);

      @GET
      Observable<RegisterResponse> register(@Body RegisterRequest request);
      }
    • 创建Retrofit客户端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      private static Retrofit create() {
      OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
      builder.readTimeout(10, TimeUnit.SECONDS);
      builder.connectTimeout(9, TimeUnit.SECONDS);

      if (BuildConfig.DEBUG) {
      HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
      interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
      builder.addInterceptor(interceptor);
      }

      return new Retrofit.Builder().baseUrl(baseUrl)
      .client(builder.build())
      .addConverterFactory(GsonConverterFactory.create())
      .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
      .build();
      }
    • 发起请求

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      Api api = retrofit.create(Api.class);
      api.login(request)
      .subscribeOn(Schedulers.io()) //在IO线程进行网络请求
      .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
      .subscribe(new Observer<LoginResponse>() {
      @Override
      public void onSubscribe(Disposable d) {}

      @Override
      public void onNext(LoginResponse value) {}

      @Override
      public void onError(Throwable e) {
      Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
      }

      @Override
      public void onComplete() {
      Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
      }
      });
  • RxJava中一个很重要的东西–Disposable

    当我们在请求的过程中退出了当前Activity以后,请求成功再回到主线程更新UI,这个时候肯定会造成闪退。而RxJava的observer都会要求重写这个方法

    1
    2
    @Override
    public void onSubscribe(Disposable d) {}

    我们只需要在运行到这个方法时候就将这个Diposable对象存起来,然后在activity销毁的时候调用它的disposable()方法, 即可切断上下游

    不过最好的做法是,使用RxJava内置的容器CompositeDiposable来保存Disposable对象,每得到一个Diposable时,就调用compositeDisposable.add()将其存放起来,在activity销毁的时候,调用compositeDisposable.clear()来切断保存的所有的上下游之间的联系

RxJava的操作符

创建操作符

  • from:可将Iterable、Future、数组转化为一个Observable对象。(以数组为例)这个Observable将数组的元素逐个进行发送

    1
    2
    String []ss = {"ss","s"};
    Observable.fromArray(ss);
  • just:创建将逐个内容进行发送的Observable,其内部发送内容在内部以from的操作符的方式进行转换

    1
    2
    //简单例子
    Observable.just("ss","s")
  • Map:对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    //从代码中来看就是

    /**
    * 在上游事件中,发送的是数字类型,而下 游接受的是String类型的数据,通过Map就可以实现这个转换
    */
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    }
    }).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
    return "*********result " + integer;
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    Log.d(TAG, s);
    }
    });
  • range:创建以发送范围内的整数序列的Observable

  • interval:创建以1秒为事件间隔发送整数序列的Observable

    1
    Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);

变换操作符

  • FlatMap:将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里

    中间的flatMap将原本的圆形的事件转换为发送矩形事件和三角形事件的新Observable

    简单来说,上游每发送一个事件,flatMap都建立一个新的上游,然后无序发送转换之后的新的事件。如果需要保证顺序的话需要使用concatMap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
    final List<String> list = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
    list.add("value :" + integer);
    }
    //这里加了延时,来体现无序
    return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    Log.d(TAG, s);
    }
    });
  • buffer:将原有Observable转换为一个新的Observable,这个新的Observable每次发送一组值,而不是一个个进行发送

    • 基于时间

      1. 发送的元素,并不会直接用于Observer,而是会暂存在一个缓存区当中,以指定的时间去取
      2. 可能会存在取不到值的情况,缓存区当中没有值
      3. 最后一次取值时间会于之前不同

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      Log.d(TAG, "in");
      Observable.interval(1, TimeUnit.SECONDS)
      .take(5)
      .buffer(2, TimeUnit.SECONDS)
      .subscribe(new Observer<List<Long>>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(List<Long> longs) {
      Log.d(TAG, "longs:" + longs);
      }

      @Override
      public void onError(Throwable e) {
      Log.d(TAG, "onError");
      }

      @Override
      public void onComplete() {
      Log.d(TAG, "onComplete");
      }
      });
  • 基于数量

    参数1: 每回取的数量,参数2: 跳过的数量

    如果未指定跳过的数量,则取了几个就跳过几个

    如果指定了跳过的数量,则按照指定跳过,并且list.size() = 发送数量/skip+1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    //不采用跳过
    Observable.just(1,2,3,4,5)
    .buffer(3)
    //如果使用不跳过,则取消下面一句代码的注释,并注释上一句代码
    // .buffer(3,2)
    .subscribe(new Observer<List<Integer>>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List<Integer> integers) {
    Log.d(TAG, "integers:" + integers);
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 基于时间和数量

    image-20191129115604124

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Log.d(TAG, "in");
    Observable.interval(1, TimeUnit.SECONDS)
    .take(7)
    .buffer(3, TimeUnit.SECONDS,2)
    .subscribe(new Observer<List<Long>>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List<Long> longs) {
    Log.d(TAG, "longs:" + longs);
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });