简介
rxjava是Java一个rx最近的多线程技术。。。
他是一个扩展观察者模式实现的一个多线程技术
观察者模式:
观察者模式就是相当于警察(观察者)去观察小偷(被观察者),当小偷去偷东西(事件)的时候,警察能立刻知道,并做出相应的反应。不过在编程中是告诉被观察者在执行事件的过程中去提醒观察者。。现实中小偷不会偷东西时候提醒观察者
rxJava
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
基本创建:
1 | Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() { |
subscribe
onSubscribe
onNext = hello
onComplete
其中oble是一个被观察者,oser是一个观察者,被观察者可以调用onNext向观察者发送内容,此时观察者就能通过重写的onNext获取到数据,执行相应的操作
另外观察者还有oncomplete和onerror,如果执行了onComplete方法,那么就会断开联系,所以hello2没有显示出来,如果发生了错误会调用了onerror也会立马断开联系。
另一些
简写被观察者
上面的例子是create一个最基本的被观察者,当如果被观察者只有一个动作的时候就不需要那么复杂的操作,可以用一个just1
Observable<String> observable = Observable.just("hello");
这样就是只执行一个onNext(’hello’);
简写观察者
当然对于观察者也是一样,如果不用考虑oncomplete和onerror也可以简写,创建一个consumer对象,重写accept方法就行,然后通过被观察者.subscribe(观察者)来建立联系1
2
3
4
5
6
7
8Observable<String> observable = Observable.just("hello");
Consumer<String> consumer = new Consumer<String>() {
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);
创建完成或者错误的另一些方法
可以创建一个action对象来处理oncomplete的事件,用一个consumer来处理onnext和onerror事件,最后重载subscribe的一些方法达到建立关系的目的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Observable<String> observable = Observable.just("hello");
Action onCompleteAction = new Action() {
public void run() throws Exception {
Log.i("kaelpu", "complete");
}
};
Consumer<String> onNextConsumer = new Consumer<String>() {
public void accept(String s) throws Exception {
Log.i("kaelpu", s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
public void accept(Throwable throwable) throws Exception {
Log.i("kaelpu", "error");
}
};
observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);
}
1 | public final Disposable subscribe() {} |
上面是subscribe一些重载方法
线程调度
rxJava最大的好处就是能够在多线程的情况下去实现,主要能应用在Android更新UI上。。
在建立关系subscribe的时候会有一些方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
Log.d("kaelpu", "emitter 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
Log.d("kaelpu", "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
最后那个建立关系的意思是让被监听者在Schedulers.newThread()这个新线程上,然后让观察者在AndroidSchedulers.mainThread()这个主线程上,就实现了主线程更新UI的操作,主要就是subscribeOn是让被观察者运行的线程,observeOn是观察者运行的线程
进阶使用
rxjava还有很多操作符可以满足绝大多数的异步场景
Android RxJava:这是一份全面 & 详细 的RxJava操作符 使用攻略