rxjava
https://www.zybuluo.com/cxm-2016/note/467206
==========
http://www.itread01.com/articles/1476290449.html
Operators一句話介紹(RxJava版)
Cold Observables
在第一個subscriber訂閱後才執行事件發送的Observables,默認普通Observables都是這個類型
Cold Observables對於每個訂閱的subscriber執行一次事件發送過程的重演,每次事件實體將重新生成,尤其對於每次隨機生成的數值將不保證保持一致性
參考:Observable vs ConnectableObservable
Hot Observables
從創建一刻開始立即發送事件,此後進行訂閱的subscribers僅能接收在訂閱後發送的事件
Hot Observables發送的事件實體對象在所有subscribers之間進行共享
Connectable Observables
在connect()調用時進行事件發送動作,不論是否有subscriber執行了訂閱。實際上是把該Observable轉換為Hot Observable,發送的事件實體對象在所有subscribers之間進行共享
Creating
create / just(T..) / from(Iterable/Array) 常用Observable構建方式
defer(Func0) 延遲生成Observable,每次subscribe時生成新的Observable
range(start, len) 發送從start開始的數字序列,長度為len,值區間為[start, start+len-1]
repeat(n) 非直接構造器,將上個構造好的Observable重復n次發送(前一次Observable發送onCompleted後再啟動下一次的重訂閱),不傳參時表示無限repeat
repeatWhen(Func1<Observable, Observable) 通過函數判斷是否進行下一次重訂閱,可參考retryWhen(Func1)解釋
interval(time) 周期性發送一個序號數字(從0開始的正整數序列順序)
timer(time) 在指定時間後發送數字0
Transforming
map(Func1) / cast(class) 將單個事件轉型為另一類型事件或據此構造新的次級Observable並匯入單一事件流
flatMap / flatMapIterable 依據單個事件構造新的次級Observable或Iterable並匯入單一事件流,flatMap不保證發送順序 (允許交叉發送)
concatMap / switchMap concatMap是有序化版本的flatMap,switchMap是強時效版本的flatMap(下個次級Observable開始發送事件時,前面的次級Observable將自動被終止發送)
buffer(count/time) 按數量、時間緩存前後多個事件為一個個小組,再按小組為單位逐次發送
window(count/time) 類似於buffer,但緩存的小組以子Observable形式作為事件發送 (需要二次subscribe分發)
groupBy(KeyFunc, ValueFunc) 通過函數獲得每個事件的Key值進行分組,並將事件轉換為Value類型,返回發送GroupedObservable類型事件的包裹Observable (需要二次subscribe分發)
scan(Func2(x1, x2)) 前一次實際發送的事件x1與本次該發送的源事件x2通過函數計算,其結果作為本次實際發送的的事件,第0個初始事件不經過函數直接發送
Filtering
filter(Func1) 通過函數判斷事件是否可發送
throttleWithTimeout(time) / debounce(time) 每次事件發送時啟動獨立時效計時器,計時結束前有新事件發送則拋棄舊事件並重新啟動計時,計時結束後事件未被拋棄才執行實際發出
debounce(Func1) 以函數返回的臨時Observable的(虛擬)事件發送完畢為時間標記點進行事件有效計時判斷。本次事件產生的臨時Observable發送結束前未有新的臨時Observable產生並發送,則本次事件執行實際發出
throttleFirst(time) 發送每個時間周期片段內第一個發送的事件
sample(time) / throttleLast(time) 發送每個時間周期片段內最後一個發送的事件
distinct 去除事件集合中的所有重復項
distinctUntilChanged 去除事件集合中連續重復部分的重復項 (允許ABAB交叉發送)
skip(count) / skipLast() 拋棄前面/後面n個事件
take(count) / takeLast(count) 僅發送前面/後面n個事件
first(Func1) / last(Func1) 僅發送首個、最後一個滿足函數條件的事件
elementAt(index) 僅發送指定下標位置的事件
firstOrDefault() / lastOrDefault() / elementAtOrDefault() 在事件數量不足時,發送一個默認代替事件
ofType(class) 僅發送指定子類型的事件
ignoreElements() 刪除所有事件的發送,僅發送 onCompleted / onError 結束通知
Combining
startWith(T…/Iterable/Observable) 在事件發送前追加優先發送的事件
merge(Observable…) 最簡單的事件組合操作符,將任意多個Observable發送的事件混合到單一事件流中(類似flatMap),所有Observable發送的事件類型必須相同,允許交叉發送,任一成員Observable發送 onError 時,將終止其他所有Observable的事件發送
meregeDelayError(Observable…) onError 事件會緩存到merge結束後發送的版本
concat 每個Observable按加入順序發送的merge版本
switchOnNext(Observable) 將一個發送多個次級Observable的父Observable轉換為單一事件流的Observable(類似flatMap),當下一個子Observable被父Observable發送出來時,前一個子Observable將被終止發送事件 (可視為Observable構造器)
zip(Observable…, FuncN) 組合任意多個Observable的事件,每個成員Observable發送的第n次事件進行組合,通過FuncN函數合成出本次(第n次)發送的事件數據,事件合成、發送時機取決於最後一個發送第n次事件的成員Observable,任一成員Observable發送完畢時觸發整體發送 onCompleted 事件 (可視為Observable構造器)
zipWith(Iterable/Observable, Func2) 組合另一個Observable
combineLatest(Observable…, FuncN) 組合任意多個Observable最後一次發送的事件(每個Observable至少發送一次事件才有效),當任一成員Observable發送新事件時,將觸發一次FuncN函數將其他各Observable最後一次發送的事件進行組合,返回本次需要實際發送的事件對象
join(Observable, leftTimer, rightTimer, resultFunc) 當前Observable與目標Observable進行組合, leftTimer / rightTimer 函數產生一個用於判定時間窗口長度的臨時Observable,兩個Observable發送的事件在各自窗口期內遇到對方Observable發送新事件,則觸發 resultFunc 函數進行兩個事件的組合,產生本次發送的事件對象,同一窗口期內遇多次事件發送,則分別進行多次組合發送
groupJoin() 返回group組合的Observable對象作為事件的join版本
Error Handling
onErrorReturn(Func1) 發生錯誤時,發出一個替身事件取代異常拋出
onErrorResumeNext(Observable) 發生錯誤時,使用另一個替身Observable取代當前Observable繼續發送事件
onExceptionResumeNext(Observable) 發生Exception類型錯誤時,使用另一個替身Observable取代當前Observable繼續發送事件,否則將錯誤正常拋出到 onError 方法
retry(count/Func2) 通過次數或函數判斷是否需要重啟Observable(重訂閱),不滿足retry條件則將錯誤正常拋出到 onError 方法,不傳參時表示無條件retry
retryWhen(Func1<Observable errorOb, Observable notiOb>) 通過函數判斷是否進行下一次重啟Observable(重訂閱),函數需要返回一個以errorOb為根的Observable對象notiOb,若errorOb本次發送的Throwable事件(從源Observable的onError截獲)觸發notiOb發送 onNext 事件(發送事件類型無關)則觸發本次retry,若notiOb發送 onCompleted / onError 事件則不再觸發retry,此時將錯誤正常拋出到 onError 方法
可考慮通過errorOb.flatMap進行throwable->Observable的轉換,或通過zipWith(Observable.range)實現次數限制等,必須以errorOb作為返回的源Observable,直接返回errorOb時與簡單調用retry()效果相同
參考:對RxJava中.repeatWhen()和.retryWhen()操作符的思考
Utility
delay(time) / delaySubscription(time) 延遲事件發送 / 訂閱的過程
timeout(time) 對每個發送的事件設置超時計時器,計時結束時下個事件未被發出,則拋出超時異常等操作
timeInterval() 將發送的 onNext 事件替換為僅包含距離上次事件發送的時間間隔TimeInterval類型事件
timeStamp() 將發送的 onNext 事件與時間戳一起打包成Timestamped類型事件
do..(ActionX) 在Observable的生命周期的各個階段添加回調監聽動作
doOnEach = doOnNext + doOnError + doOnCompleted
doOnNext onNext 發生的時候觸發回調
doOnError onError 發生的時候觸發回調
doOnCompleted onCompleted 發生的時候觸發回調
doOnTerminate doOnError / doOnCompleted 後觸發,完成後觸發subscriber相關回調
finallyDo Observable結束後觸發回調
doOnSubscribe subscribe 執行時觸發回調
doOnUnsubscribe unsubscribe / onError / onCompleted 的時候觸發回調
doOnSubscribe -> doOnEach -> doOnNext -> subscriber.onNext -> doOnEach -> doOnCompleted -> doOnUnsubscribe
doOnEach -> doOnError -> doOnTerminate -> subscriber.onError -> finallyDo
materialize() 將每個事件(包括 onCompleted / onError )分別打包為一個Notification類型事件進行發送
dematerialize() materialize的反向操作
subscribOn/observerOn 線程指定
using(Func0 create, Func1<Resource, Observable> timer, Action1 dispose) 創建臨時性資源,通過timer返回的Observable的終止時間來控制資源有效期,資源到期後回調dispose方法進行銷毀
single / singleOrDefault 如果Observable發送超過一個事件,則拋出異常或發送指定的事件
serialize 強制Observable的所有事件以同步方式發送(杜絕同時發送多個事件的可能性)
cache 緩存所有發送過的事件實體對象,用於在將來添加訂閱的Subscribers中回放(使用舊的已發送過的事件對象,與ConnectableObservable的replay有區別),默認情況下不使用cache,則新添加訂閱的Subscriber會啟動新的Observable事件發送過程,使用同一個Observable對象以相同的過程發送新創建的事件對象
Conditional and Boolean 條件限定
all(Func1) 判斷所有發送的事件是否符合指定條件,並最終返回發送單一個Boolean類型事件
contains(T) 判斷是否發送過指定事件
exists / isEmpty 判斷Observable是否發送過事件
sequenceEqual(Observable, Observable) 判斷兩個Observable發送的事件序列(包括結束狀態)是否完全一致
amb(Observable…) / ambWith(Observable) 選取率先發送第一個事件的Observable作為源Observable進行事件發送,其他Observable將被拋棄
defaultIfEmpty() Observable沒有發送事件時,則發送一個指定默認事件
skipUntil(Observable mark) 當mark沒有發送事件前,忽略源Observable發出的一切事件
skipWhile(Func1) 當函數返回false前,忽略Observable發出的一切事件
takeUntil / takeWhile 與 skipUntil / skipWhile 相反
Aggregate 集合
concat 每個Observable按加入順序發送的merge版本
count 統計Observable一共會進行發射的事件總數,事件會被拋棄忽略,僅在最後發送一個統計結果事件
reduce(Func2(x1, x2)) 與scan相同的運算方式,但僅發送最終的單個結果事件
collect(Func0 collector, Action2 collection) 事件收集器,實際運作方式與reduce類似,通過collection方法將每個發送的事件收集到collector方法產生的集合對象,並僅在最後發送這個收集完畢的集合對象事件
toList / toSortedList / toMap / toMultimap 將所有發送的事件對象收集為List、Map等集合對象,並在最後進行一次集合事件發送
Blocking
阻塞型Observable,通過一系列Operators對事件數據的發射進行阻塞。
可通過 Observable.toBlocking() 或 BlockingObservable.from() 將普通Observable轉換為BlockingObservable
forEach 對每個發射的事件應用一個函數,該函數會被阻塞直到Observable完成
first 阻塞第一個事件的發射,直到Observable發射下一個事件(或結束事件)
firstOrDefault 如果Observable沒有發射下一個事件(或結束事件),則發射一個默認事件
last / lastOrDefault 阻塞最後一個事件的發射,直到Observable發射一個結束事件
mostRecent 返回一個指向最近發射的事件序列的iterable
next 返回一個指向下一個發射的事件的iterable,阻塞獲取直到Observable執行發射動作
latest 返回一個iterable,阻塞獲取直到Observable發射了一個iterable沒有返回的值,然後返回這個值
single / singleOrDefault 如果Observable發送超過一個事件,則拋出異常或發送指定的事件
toFuture Observable轉換為Future對象
toIterable / getIterator Observable發送的所有事件轉換為單一Iterable / Iterator對象
Connectable Observable
在connect()調用時進行事件發送動作的Observable
publish 將一個Observable轉換為ConnectableObservable
replay 將一個Observable轉換為ConnectableObservable,對於事件發送後新加入訂閱的Observer進行過去的事件發送過程的重放(發出新創建的事件實體對象,區別於cache),保證任何時間加入訂閱的Observer都能接收到完整的整個事件序列,可傳入一函數對源Observable轉換為一個事件變形的新Observable,可參數指定buffer大小以及timeout時長
connect 令一個ConnectableObservable開始進行事件發送
refCount 令一個ConnectableObservable以普通Observable方式發送事件(不再以connect方法為發送開始標記),但仍保持所有Observer之間共享事件實體對象(保持與ConnectableObservable的鏈接,但至少需要保留一個Observer訂閱)
share 相當於publish+refCount應用到一個Observable(至少需要存在一個Observer訂???),事件實體將在所有Observer之間進行共享
String Observable
StringObservable類提供的基於String類型事件的額外Operators
byLine 把源String事件進行統合再處理,通過換行符作為新String事件的分割位點,轉換為每一獨立行為單位的新事件序列
decode 將多字節字符流轉換為字節數組事件類型的Observable
encode 將字符串事件Observable轉換為字節數組Observable
from 將字符流或Reader對象轉換為字節數組或String類型Observable
stringConcat 將String事件類型Observable的所有String事件數據進行整合,以單一String形式整體發送
join 以stringConcat方式串聯源Observable的所有String事件,但以指定的String作為切分標記
split 將String事件類型Observable的所有String事件進行整合,並重新以正則表達式進行切分
其他輔助庫
rxjava-async 用於生成異步事件發送的Observables
rxjava-joins 提供額外的Observables組合機制,如 and(), then(), when()
rxjava-computation-expressions 提供額外的條件判斷Operators
rxjava-math 提供一系列用於數值型事件的簡單數學統計、分析結果Operators
參考:
木水川 - RxJava操作符
【譯】對RxJava中.repeatWhen()和.retryWhen()操作符的思考
ReactiveX - Operators By Category
RxJava - wiki