Java擴大庫RxJava的根本構造與實用場景小結。本站提示廣大學習愛好者:(Java擴大庫RxJava的根本構造與實用場景小結)文章只能為提供參考,不一定能成為您想要的結果。以下是Java擴大庫RxJava的根本構造與實用場景小結正文
根本構造
我們先來看一段最根本的代碼,剖析這段代碼在RxJava中是若何完成的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.create(onSubscriber1)
.subscribe(subscriber1);
起首我們來看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
直接就是挪用了Observable的結構函數來創立一個新的Observable對象,這個對象我們臨時標志為observable1,以便前面追溯。
同時,會將我們傳入的OnSubscribe對象onSubscribe1保留在observable1的onSubscribe屬性中,這個屬性在前面的高低文中很主要,年夜家留意一下。
接上去我們來看看subscribe辦法。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}
可以看到,subscribe以後,就直接挪用了observable1.onSubscribe.call辦法,也就是我們代碼中的onSubscribe1對象的call辦法
,傳入的參數就是我們代碼中界說的subscriber1對象。call辦法中所做的工作就是挪用傳入的subscriber1對象的onNext和onComplete辦法。
如許就完成了不雅察者和被不雅察者之間的通信,是否是很簡略?
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
RxJava應用場景小結
1.取數據先檢討緩存的場景
取數據,起首檢討內存能否有緩存
然後檢討文件緩存中能否有
最初才從收集中取
後面任何一個前提知足,就不會履行前面的
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//重要就是靠concat operator來完成
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
2.界面須要比及多個接口並發取完數據,再更新
//拼接兩個Observable的輸入,不包管次序,依照事宜發生的次序發送給定閱者
private void testMerge() {
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
}
3.一個接口的要求依附另外一個API要求前往的數據
舉個例子,我們常常在須要上岸以後,依據拿到的token去獲得新聞列表。
這裡用RxJava重要處理嵌套回調的成績,有一個專著名詞叫Callback hell
NetworkService.getToken("username", "password")
.flatMap(s -> NetworkService.getMessage(s))
.subscribe(s -> {
System.out.println("message: " + s);
});
4.界面按鈕須要避免持續點擊的情形
RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(aVoid -> {
System.out.println("click");
});
5.呼應式的界面
好比勾選了某個checkbox,主動更新對應的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());
6.龐雜的數據變換
Observable.just("1", "2", "2", "3", "4", "5")
.map(Integer::parseInt)
.filter(s -> s > 1)
.distinct()
.take(3)
.reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
.subscribe(System.out::println);//9