리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.
이 부분에서는 디버깅과 예외처리 및 흐름 제어에 대한 부분을 포스팅합니다.
디버깅
RxJava는 로그를 넣을 수 있는 공간이 없기 때문에, doOnXXX() 계열의 함수를 이용하여 강제로 부수 효과를 일으켜 버그를 잡아야 합니다.
doOnXXX 계열의 함수들
강제로 부수효과를 일으켜 버그를 알 수 있게 해줍니다.
- doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
- doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
- doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생
- doOnEach() 함수 : onNext, onComplete, onError 이벤트를 한번에 처리
- doOnSubscribe() 함수 : Observable을 구독했을때, 호출
- doOnDispose() 함수 : Observable의 구독이 해지되었을때 호출
doOnNext() 함수, doOnComplete() 함수, doOnError() 함수
- doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
- doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
- doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생
Integer[] orgs = {10, 5, 0};
Observable.fromArray(orgs)
.map(div -> 1000 / div)
.doOnNext(data -> Log.d("onNext() = " + data))
.doOnComplete(() -> Log.d("onComplete"))
.doOnError(e -> Log.e("onError() = " + e.getMessage()))
.subscribe(Log::i);
//------------------------------------------
결과)
main | value = 100
main | debug = onNext() = 200
main | value = 200
main | error = onError() = / by zero
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.Observable.subscribe(Observable.java:10889)
at io.reactivex.Observable.subscribe(Observable.java:10792)
at RxJava.main(RxJava.java:33)
Caused by: java.lang.ArithmeticException: / by zero
at RxJava.lambda$main$0(RxJava.java:29)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
... 14 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: / by zero
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.Observable.subscribe(Observable.java:10889)
at io.reactivex.Observable.subscribe(Observable.java:10792)
at RxJava.main(RxJava.java:33)
Caused by: java.lang.ArithmeticException: / by zero
at RxJava.lambda$main$0(RxJava.java:29)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
... 14 more
doOnEach() 함수
onNext, onComplete, onError 이벤트를 한번에 처리할 수 있습니다.
String[] data = {"ONE", "TWO", "THREE"};
Observable<String> source = Observable.fromArray(data);
source.doOnEach(noti -> {
if (noti.isOnNext()) Log.d("onNext() = " + noti.getValue());
if (noti.isOnComplete()) Log.d("onComplete()");
if (noti.isOnError()) Log.e("onError() = " + noti.getError().getMessage());
}).subscribe(Log::i);
//------------------------------------------
결과)
main | debug = onNext() = ONE
main | value = ONE
main | debug = onNext() = TWO
main | value = TWO
main | debug = onNext() = THREE
main | value = THREE
main | debug = onComplete()
doOnSubscribe(), doOnDispose()
- doOnSubscribe() 함수 : Observable을 구독했을때, 호출
- doOnDispose() 함수 : Observable의 구독이 해지되었을때 호출
String[] orgs = {"1", "3", "5", "2", "6"};
Observable<String> source = Observable.fromArray(orgs)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
.doOnSubscribe(d -> Log.d("onSubscribe()"))
.doOnDispose(() -> Log.d("onDispose()"));
Disposable disposable = source.subscribe(Log::i);
CommonUtils.sleep(200);
disposable.dispose();
CommonUtils.sleep(300);
//------------------------------------------
결과)
main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose()
doOnLifeCycle() 함수
doOnSubscribe()와 doOnDispose() 함수를 한번에 호출하는 함수
String[] orgs = {"1", "3", "5", "2", "6"};
Observable<String> source = Observable.fromArray(orgs)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
.doOnLifecycle(d -> Log.d("onSubscribe()"), () -> Log.d("onDispose()"));
Disposable disposable = source.subscribe(Log::i);
CommonUtils.sleep(200);
disposable.dispose();
CommonUtils.sleep(300);
//------------------------------------------
결과)
main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose()
예외 처리
RxJava에서는 try-catch문을 이용하여 예외처리를 할 수 없습니다, 그래서 기존과 다른 방식을 이용해야 합니다.
onErrorReturn() 함수
RxJava에서는 에러도 어떠한 데이터로 보는것이 적절합니다. 그렇기 때문 예외가 발생하면, 에러를 의미하는 다른 데이터로 대체를 합니다.
String[] grades = {"70", "88", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorReturn(e -> {
if (e instanceof NumberFormatException) {
e.printStackTrace();
}
return -1;
});
source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}
Log.i("Grade is " + data);
});
//------------------------------------------
결과)
main | value = Grade is 70
main | value = Grade is 88
java.lang.NumberFormatException: For input string: "$100"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:569)
at java.lang.Integer.parseInt(Integer.java:615)
at RxJava.lambda$main$0(RxJava.java:30)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableOnErrorReturn.subscribeActual(ObservableOnErrorReturn.java:31)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.Observable.subscribe(Observable.java:10889)
at io.reactivex.Observable.subscribe(Observable.java:10792)
at RxJava.main(RxJava.java:38)
main | error = Wrong Data found!!
onError와 onErrorReturn()의 차이점
- 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있습니다.
- Observable을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 그에 맞게 해결이 될 수 있습니다.
onErrorReturnItem() 함수
onErrorReturn() 함수와 비슷하지만 중간에 에러를 넘기는 부분을 구현안해도 되기 때문에 좀 더 간결해집니다.
String[] grades = {"70", "88", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorReturnItem(-1);
source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}
Log.i("Grade is " + data);
});
//------------------------------------------
결과)
main | value = Grade is 70
main | value = Grade is 88
main | error = Wrong Data found!!
onErrorResumeNext() 함수
onErrorResumeNext()는 에러가 발생했을 때 내가 원하는 Observable로 대체합니다.
이용할 수 있는 케이스로는 에러 발생시 기본값을 준다던가, 관리자에게 알람을 준다든가, 자원을 해제하는 등의 추가 작업을 할 수 있습니다.
String[] salesData = {"70", "88", "A300"};
Observable<Integer> onParseError = Observable.defer(() -> {
Log.d("Send mail!!");
return Observable.just(-1);
}).subscribeOn(Schedulers.io());
Observable<Integer> source = Observable.fromArray(salesData)
.map(Integer::parseInt)
.onErrorResumeNext(onParseError);
source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong data found!!");
return;
}
Log.i("Sales Data : " + data);
});
//------------------------------------------
결과)
main | value = Sales Data : 70
main | value = Sales Data : 88
RxCachedThreadScheduler-1 | debug = Send mail!!
retry() 함수
서버 연결과 같이 인터넷이 일시적으로 연결이 안될때는 재 요청을 해야하는데 이때 retry()를 사용합니다.
retry()는 실패했을때 호출이 됨
final int RETRY_MAX = 5;
final int RETRY_DELAY = 1000;
CommonUtils.exampleStart();
String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.retry((retryCnt, e) -> {
Log.e("retryCnt = " + retryCnt);
CommonUtils.sleep(RETRY_DELAY);
return retryCnt < RETRY_MAX;
})
.onErrorReturn(e -> CommonUtils.ERROR_CODE);
source.subscribe(data -> Log.it("result : " + data));
CommonUtils.exampleComplete();
//------------------------------------------
결과)
main | 719 | error = api.github.com: nodename nor servname provided, or not known
main | error = retryCnt = 1
main | 1724 | error = api.github.com
main | error = retryCnt = 2
main | 2729 | error = api.github.com
main | error = retryCnt = 3
main | 3734 | error = api.github.com
main | error = retryCnt = 4
main | 4739 | error = api.github.com
main | error = retryCnt = 5
main | 5742 | value = result : -500
retryUntil() 함수
조건이 만족할때까지 재시도 하는 함수입니다.
CommonUtils.exampleStart();
String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.subscribeOn(Schedulers.io())
.retryUntil(() -> {
if(CommonUtils.isNetworkAvailable())
return true; // 중지
CommonUtils.sleep(1000);
return false; // 계쏙 진행
});
source.subscribe(Log::i);
CommonUtils.sleep(5000);
//------------------------------------------
결과)
RxCachedThreadScheduler-1 | 772 | error = api.github.com: nodename nor servname provided, or not known
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 1780 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 2784 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 3786 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 4790 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
흐름 제어
흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 시간 차이가 발생할 때 사용하는 함수입니다.
- sample() 함수 : 특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시
- buffer() 함수 : 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용
- throttleFirst() 함수 : 주어진 조건에서 가장 먼저 입력된 값을 발행
- throttleLast() 함수 : 함수는 주어진 조건에서 가장 마지막 입력된 값을 발행
- window() 함수 : groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듦
- debounce() 함수 : 연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용
–
sample() 함수
특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시합니다.
String[] data = {"1", "7", "2", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data)
.take(4)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[4])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행
Observable<String> source = Observable.concat(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS); // 300L 간격으로 가장 최근 데이터만 가져옴
source.subscribe(Log::it);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-1 | 624 | value = 7
RxComputationThreadPool-1 | 922 | value = 3
String[] data = {"1", "7", "2", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data)
.take(4)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[4])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행
Observable<String> source = Observable.concat(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS, true); // true를 넣어주면 마지막 데이터는 가져옴
source.subscribe(Log::it);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-1 | 713 | value = 7
RxComputationThreadPool-1 | 1015 | value = 3
RxComputationThreadPool-3 | 1129 | value = 6
buffer() 함수
일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용합니다.
String[] data = {"1", "7", "2", "4", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> middleSource = Observable.fromArray(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행
Observable<List<String>> source = Observable.concat(earlySource, middleSource, lateSource)
.buffer(3); // 3개씩 모았다가 발행
source.subscribe(Log::it);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-1 | 664 | value = [1, 7, 2]
RxComputationThreadPool-3 | 1167 | value = [4, 3, 6]
String[] data = {"1", "7", "2", "4", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> middleSource = Observable.fromArray(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행
Observable<List<String>> source = Observable.concat(earlySource, middleSource, lateSource)
.buffer(2, 3); // 3개씩 모았다가 2개를 모으고 1개를 스킵
source.subscribe(Log::it);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-1 | 627 | value = [1, 7]
RxComputationThreadPool-3 | 1131 | value = [4, 3]
throttleFirst(), throttleLast() 함수
throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행하고, throttleLast() 함수는 가장 마지막에 입력된 값을 발행합니다.
String[] data = {"1", "7", "2", "4", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data[0])
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> middleSource = Observable.fromArray(data[1])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[2], data[3], data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a) // 300L 마다 발행
.doOnNext(Log::e); // 디버깅 정보 발행
Observable<String> source = Observable.concat(earlySource, middleSource, lateSource)
.throttleFirst(200L, TimeUnit.MILLISECONDS);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-1 | 466 | value = 1
RxComputationThreadPool-3 | 773 | value = 7
RxComputationThreadPool-4 | error = 2
RxComputationThreadPool-4 | error = 4
RxComputationThreadPool-4 | 978 | value = 4
RxComputationThreadPool-4 | error = 3
RxComputationThreadPool-4 | error = 6
window() 함수
groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듭니다.
String[] data = {"1", "7", "2", "4", "3", "6"};
CommonUtils.exampleStart();
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> middleSource = Observable.fromArray(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행
Observable<String> lateSource = Observable.fromArray(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행
Observable<Observable<String>> source = Observable.concat(earlySource, middleSource, lateSource)
.window(3);
source.subscribe(observable -> {
Log.d("new Observable Start");
observable.subscribe(Log::it);
});
CommonUtils.sleep(1000);
CommonUtils.exampleComplete();
//------------------------------------------
결과)
RxComputationThreadPool-1 | debug = new Observable Start
RxComputationThreadPool-1 | 508 | value = 1
RxComputationThreadPool-1 | 588 | value = 7
RxComputationThreadPool-1 | 690 | value = 2
RxComputationThreadPool-2 | debug = new Observable Start
RxComputationThreadPool-2 | 995 | value = 4
RxComputationThreadPool-3 | 1099 | value = 3
RxComputationThreadPool-3 | 1199 | value = 6
debounce() 함수
연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용합니다.
String[] data = {"1", "7", "2", "4"};
Observable<String> source = Observable.concat(
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[0]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[1]),
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[2]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[3])
).debounce(200L, TimeUnit.MILLISECONDS);
source.subscribe(Log::i);
CommonUtils.sleep(1000);
//------------------------------------------
결과)
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 4