rxjava

github

简介

rxjava是Java一个rx最近的多线程技术。。。

他是一个扩展观察者模式实现的一个多线程技术

观察者模式:

观察者模式就是相当于警察(观察者)去观察小偷(被观察者),当小偷去偷东西(事件)的时候,警察能立刻知道,并做出相应的反应。不过在编程中是告诉被观察者在执行事件的过程中去提醒观察者。。现实中小偷不会偷东西时候提醒观察者

rxJava

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

基本创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onComplete();
e.onNext("hello2");

}
});

Observer<String> oser = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.w("","onSubscribe");
} @Override
public void onNext(@NonNull String s) {
Log.w("","onNext = "+s);
} @Override
public void onError(@NonNull Throwable e) {
Log.w("","onError" + e);
} @Override
public void onComplete() {
Log.w("","onComplete");
}
};

Log.w("","subscribe");
oble.subscribe(oser);

subscribe
onSubscribe
onNext = hello
onComplete

其中oble是一个被观察者,oser是一个观察者,被观察者可以调用onNext向观察者发送内容,此时观察者就能通过重写的onNext获取到数据,执行相应的操作

另外观察者还有oncomplete和onerror,如果执行了onComplete方法,那么就会断开联系,所以hello2没有显示出来,如果发生了错误会调用了onerror也会立马断开联系。

另一些

简写被观察者

上面的例子是create一个最基本的被观察者,当如果被观察者只有一个动作的时候就不需要那么复杂的操作,可以用一个just

1
Observable<String> observable = Observable.just("hello");

这样就是只执行一个onNext(’hello’);

简写观察者

当然对于观察者也是一样,如果不用考虑oncomplete和onerror也可以简写,创建一个consumer对象,重写accept方法就行,然后通过被观察者.subscribe(观察者)来建立联系

1
2
3
4
5
6
7
8
Observable<String> observable = Observable.just("hello");
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);

创建完成或者错误的另一些方法
可以创建一个action对象来处理oncomplete的事件,用一个consumer来处理onnext和onerror事件,最后重载subscribe的一些方法达到建立关系的目的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable<String> observable = Observable.just("hello");
Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
Log.i("kaelpu", "complete");
}
};
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("kaelpu", s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("kaelpu", "error");
}
};
observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);

}

1
2
3
4
5
6
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

上面是subscribe一些重载方法

线程调度

rxJava最大的好处就是能够在多线程的情况下去实现,主要能应用在Android更新UI上。。

在建立关系subscribe的时候会有一些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
Log.d("kaelpu", "emitter 1");
emitter.onNext(1);
}
});

Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
Log.d("kaelpu", "onNext: " + integer);
}
};

observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}

最后那个建立关系的意思是让被监听者在Schedulers.newThread()这个新线程上,然后让观察者在AndroidSchedulers.mainThread()这个主线程上,就实现了主线程更新UI的操作,主要就是subscribeOn是让被观察者运行的线程,observeOn是观察者运行的线程

进阶使用

rxjava还有很多操作符可以满足绝大多数的异步场景
Android RxJava:这是一份全面 & 详细 的RxJava操作符 使用攻略