RxJava
RxJava初理解
定义
RxJava的核心思想就是异步,其在Gitub的主页的自我介绍就是” a library for composing asynchronous and event-based programs using observable sequences for the Java VM”,翻译过来就是一个在Java VM上使用可观测序列来组成异步的、基于事件的程序的库。
本质
一个实现异步操作的库,它可以使程序的具有很好的简洁性。这里的简洁性指的是逻辑上的简洁,而不是代码量的减少,它的强大之处在于当程序随着逻辑变得很复杂的时候。它依然能够保持它的简洁性
核心
RxJava的核心是Obserables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件,这里指的事件可以是任何东西,比如触摸事件,网络请求的回调接口等
一个Observable可以发出零个或者多个事件,并且知道结束或者出错。每发出一个事件,就会调用它的Subsceribe的onext()方法,最后调用subscriber.onNext()或者subscriber.onError()结束
基本使用
1 | val observable = Observable.create(ObservableOnSubscribe<Int>{ |
这其中的Emitter是发射器的意思,它是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的
onNext(T value)
、onComplete()
和onError(Throwable error)
来分别发出next、complete、error事件发送事件要注意以下几点
上游可以发送无限个onNext事件,下游可以接受无限个onNext事件
当上游发送了一个onComplete后,上游onComplete之后的事件将会
继续
发送,但是下游在接受到onComplete事件以后将不再继续
接受事件当上游发送了一个onError后,情况与onComplete相同(本人目前未验证)
上游可以不发送onComplete或者onError
最
重要
的是onComplete和Error必须唯一且互斥
RxJava强大的线程调度
在正常情况中,一个事件的流向从上游到下游是在同一个线程中工作的,RxJava的强大之处就在于线程控制
就Android来说,一个Activity所有的操作默认都是在主线程中执行的。所以如果我们不指定上下游所在的线程,那么他们默认的执行就是在主线程。
RxJava的最大的亮点就是它的线程控制。在实际工作中我们长长需要在仔细暗沉中做耗时操作,在主线程来操作UI(图片中黄色蓝色表示不同线程)
通过一下代码就可以了做到线程的调度
1
2
3observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);subscribeOn()
指定的是上游发送事件的线程,observeOn()
指定的是下游接受事件的线程。多次指定上游的线程只有第一次有效,即subcribeOn()
只有第一次指定的有效。但是下游的线程确是可以指定多次的,也就是说每调用一次observeOn()
,下游的线程就会切换一次以下是一些内置好的线程(在开发中我们应该尽量使用这些内置的Scheduler,因为RxJava内部使用的是线程池来维护这些线程,所以效率也比较高)
- Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件、数据库等io密集型的操作
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
- Schedulers.newThread() 代表一个常规的新线程
- AndroidSchedulers.mainThread() 代表Android的主线程
Rxjava配合Retrofit进行网络请求
定义Api接口
1
2
3
4
5
6
7public interface Api {
Observable<LoginResponse> login(@Body LoginRequest request);
Observable<RegisterResponse> register(@Body RegisterRequest request);
}创建Retrofit客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return new Retrofit.Builder().baseUrl(baseUrl)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}发起请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Api api = retrofit.create(Api.class);
api.login(request)
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Observer<LoginResponse>() {
public void onSubscribe(Disposable d) {}
public void onNext(LoginResponse value) {}
public void onError(Throwable e) {
Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
}
public void onComplete() {
Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
}
});
RxJava中一个很重要的东西–
Disposable
当我们在请求的过程中退出了当前Activity以后,请求成功再回到主线程更新UI,这个时候肯定会造成闪退。而RxJava的observer都会要求重写这个方法
1
2
public void onSubscribe(Disposable d) {}我们只需要在运行到这个方法时候就将这个Diposable对象存起来,然后在activity销毁的时候调用它的
disposable()
方法, 即可切断上下游不过最好的做法是,使用RxJava内置的容器
CompositeDiposable
来保存Disposable
对象,每得到一个Diposable
时,就调用compositeDisposable.add()
将其存放起来,在activity销毁的时候,调用compositeDisposable.clear()
来切断保存的所有的上下游之间的联系
RxJava的操作符
创建操作符
from:可将Iterable、Future、数组转化为一个Observable对象。(以数组为例)这个Observable将数组的元素逐个进行发送
1
2String []ss = {"ss","s"};
Observable.fromArray(ss);
just:创建将逐个内容进行发送的Observable,其内部发送内容在内部以from的操作符的方式进行转换
1
2//简单例子
Observable.just("ss","s")
Map:对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23//从代码中来看就是
/**
* 在上游事件中,发送的是数字类型,而下 游接受的是String类型的数据,通过Map就可以实现这个转换
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
public String apply(Integer integer) throws Exception {
return "*********result " + integer;
}
}).subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});range:创建以发送范围内的整数序列的Observable
interval:创建以1秒为事件间隔发送整数序列的Observable
1
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);
变换操作符
FlatMap:将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里
中间的flatMap将原本的圆形的事件转换为发送矩形事件和三角形事件的新Observable
简单来说,上游每发送一个事件,flatMap都建立一个新的上游,然后
无序
发送转换之后的新的事件。如果需要保证顺序的话需要使用concatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("value :" + integer);
}
//这里加了延时,来体现无序
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});buffer:将原有Observable转换为一个新的Observable,这个新的Observable每次发送一组值,而不是一个个进行发送
基于时间
- 发送的元素,并不会直接用于Observer,而是会暂存在一个缓存区当中,以指定的时间去取
- 可能会存在取不到值的情况,缓存区当中没有值
- 最后一次取值时间会于之前不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25Log.d(TAG, "in");
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.buffer(2, TimeUnit.SECONDS)
.subscribe(new Observer<List<Long>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Long> longs) {
Log.d(TAG, "longs:" + longs);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
基于数量
参数1: 每回取的数量,参数2: 跳过的数量
如果未指定跳过的数量,则取了几个就跳过几个
如果指定了跳过的数量,则按照指定跳过,并且list.size() = 发送数量/skip+1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26//不采用跳过
Observable.just(1,2,3,4,5)
.buffer(3)
//如果使用不跳过,则取消下面一句代码的注释,并注释上一句代码
// .buffer(3,2)
.subscribe(new Observer<List<Integer>>() {
public void onSubscribe(Disposable d) {
}
public void onNext(List<Integer> integers) {
Log.d(TAG, "integers:" + integers);
}
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
public void onComplete() {
Log.d(TAG, "onComplete");
}
});基于时间和数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25Log.d(TAG, "in");
Observable.interval(1, TimeUnit.SECONDS)
.take(7)
.buffer(3, TimeUnit.SECONDS,2)
.subscribe(new Observer<List<Long>>() {
public void onSubscribe(Disposable d) {
}
public void onNext(List<Long> longs) {
Log.d(TAG, "longs:" + longs);
}
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
public void onComplete() {
Log.d(TAG, "onComplete");
}
});