리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.

이 부분에서는 디버깅과 예외처리 및 흐름 제어에 대한 부분을 포스팅합니다.


디버깅

RxJava는 로그를 넣을 수 있는 공간이 없기 때문에, doOnXXX() 계열의 함수를 이용하여 강제로 부수 효과를 일으켜 버그를 잡아야 합니다.


doOnXXX 계열의 함수들

강제로 부수효과를 일으켜 버그를 알 수 있게 해줍니다.

  • doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
  • doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
  • doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생
  • doOnEach() 함수 : onNext, onComplete, onError 이벤트를 한번에 처리
  • doOnSubscribe() 함수 : Observable을 구독했을때, 호출
  • doOnDispose() 함수 : Observable의 구독이 해지되었을때 호출

doOnNext() 함수, doOnComplete() 함수, doOnError() 함수

  • doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
  • doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
  • doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생

Integer[] orgs = {10, 5, 0};

Observable.fromArray(orgs)
        .map(div -> 1000 / div)
        .doOnNext(data -> Log.d("onNext() = " + data))
        .doOnComplete(() -> Log.d("onComplete"))
        .doOnError(e -> Log.e("onError() = " + e.getMessage()))
        .subscribe(Log::i);
        

//------------------------------------------
            
결과)

main | value = 100
main | debug = onNext() = 200
main | value = 200
main | error = onError() = / by zero

	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
	at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
	at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
	at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.Observable.subscribe(Observable.java:10889)
	at io.reactivex.Observable.subscribe(Observable.java:10792)
	at RxJava.main(RxJava.java:33)
Caused by: java.lang.ArithmeticException: / by zero
	at RxJava.lambda$main$0(RxJava.java:29)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	... 14 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: / by zero
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
	at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
	at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
	at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
	at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.Observable.subscribe(Observable.java:10889)
	at io.reactivex.Observable.subscribe(Observable.java:10792)
	at RxJava.main(RxJava.java:33)
Caused by: java.lang.ArithmeticException: / by zero
	at RxJava.lambda$main$0(RxJava.java:29)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	... 14 more
        

doOnEach() 함수

onNext, onComplete, onError 이벤트를 한번에 처리할 수 있습니다.


String[] data = {"ONE", "TWO", "THREE"};
Observable<String> source = Observable.fromArray(data);

source.doOnEach(noti -> {
    if (noti.isOnNext()) Log.d("onNext() = " + noti.getValue());
    if (noti.isOnComplete()) Log.d("onComplete()");
    if (noti.isOnError()) Log.e("onError() = " + noti.getError().getMessage());
}).subscribe(Log::i);


//------------------------------------------
            
결과)

main | debug = onNext() = ONE
main | value = ONE
main | debug = onNext() = TWO
main | value = TWO
main | debug = onNext() = THREE
main | value = THREE
main | debug = onComplete()


doOnSubscribe(), doOnDispose()

  • doOnSubscribe() 함수 : Observable을 구독했을때, 호출
  • doOnDispose() 함수 : Observable의 구독이 해지되었을때 호출

String[] orgs = {"1", "3", "5", "2", "6"};
Observable<String> source = Observable.fromArray(orgs)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
        .doOnSubscribe(d -> Log.d("onSubscribe()"))
        .doOnDispose(() -> Log.d("onDispose()"));

Disposable disposable = source.subscribe(Log::i);

CommonUtils.sleep(200);
disposable.dispose();
CommonUtils.sleep(300);


//------------------------------------------
            
결과)

main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose()
        

doOnLifeCycle() 함수

doOnSubscribe()와 doOnDispose() 함수를 한번에 호출하는 함수


String[] orgs = {"1", "3", "5", "2", "6"};
Observable<String> source = Observable.fromArray(orgs)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
        .doOnLifecycle(d -> Log.d("onSubscribe()"), () -> Log.d("onDispose()"));

Disposable disposable = source.subscribe(Log::i);

CommonUtils.sleep(200);
disposable.dispose();
CommonUtils.sleep(300);


//------------------------------------------
            
결과)

main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose()


예외 처리

RxJava에서는 try-catch문을 이용하여 예외처리를 할 수 없습니다, 그래서 기존과 다른 방식을 이용해야 합니다.

onErrorReturn() 함수

RxJava에서는 에러도 어떠한 데이터로 보는것이 적절합니다. 그렇기 때문 예외가 발생하면, 에러를 의미하는 다른 데이터로 대체를 합니다.


String[] grades = {"70", "88", "$100", "93", "83"};

Observable<Integer> source = Observable.fromArray(grades)
        .map(data -> Integer.parseInt(data))
        .onErrorReturn(e -> {
            if (e instanceof NumberFormatException) {
                e.printStackTrace();
            }
            return -1;
        });

source.subscribe(data -> {
    if (data < 0) {
        Log.e("Wrong Data found!!");

        return;
    }

    Log.i("Grade is " + data);
});

      
//------------------------------------------
            
결과)

main | value = Grade is 70
main | value = Grade is 88
java.lang.NumberFormatException: For input string: "$100"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:569)
	at java.lang.Integer.parseInt(Integer.java:615)
	at RxJava.lambda$main$0(RxJava.java:30)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
	at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.internal.operators.observable.ObservableOnErrorReturn.subscribeActual(ObservableOnErrorReturn.java:31)
	at io.reactivex.Observable.subscribe(Observable.java:10903)
	at io.reactivex.Observable.subscribe(Observable.java:10889)
	at io.reactivex.Observable.subscribe(Observable.java:10792)
	at RxJava.main(RxJava.java:38)
main | error = Wrong Data found!!
  

onError와 onErrorReturn()의 차이점

  1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있습니다.
  2. Observable을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 그에 맞게 해결이 될 수 있습니다.

onErrorReturnItem() 함수

onErrorReturn() 함수와 비슷하지만 중간에 에러를 넘기는 부분을 구현안해도 되기 때문에 좀 더 간결해집니다.


String[] grades = {"70", "88", "$100", "93", "83"};

Observable<Integer> source = Observable.fromArray(grades)
        .map(data -> Integer.parseInt(data))
        .onErrorReturnItem(-1);

source.subscribe(data -> {
    if (data < 0) {
        Log.e("Wrong Data found!!");

        return;
    }

    Log.i("Grade is " + data);
});

      
//------------------------------------------
            
결과)

main | value = Grade is 70
main | value = Grade is 88
main | error = Wrong Data found!!

onErrorResumeNext() 함수

onErrorResumeNext()는 에러가 발생했을 때 내가 원하는 Observable로 대체합니다.

이용할 수 있는 케이스로는 에러 발생시 기본값을 준다던가, 관리자에게 알람을 준다든가, 자원을 해제하는 등의 추가 작업을 할 수 있습니다.


String[] salesData = {"70", "88", "A300"};

Observable<Integer> onParseError = Observable.defer(() -> {
    Log.d("Send mail!!");
    return Observable.just(-1);
}).subscribeOn(Schedulers.io());

Observable<Integer> source = Observable.fromArray(salesData)
        .map(Integer::parseInt)
        .onErrorResumeNext(onParseError);

source.subscribe(data -> {
    if (data < 0) {
        Log.e("Wrong data found!!");
        return;
    }

    Log.i("Sales Data : " + data);
});

//------------------------------------------
            
결과)

main | value = Sales Data : 70
main | value = Sales Data : 88
RxCachedThreadScheduler-1 | debug = Send mail!!


retry() 함수

서버 연결과 같이 인터넷이 일시적으로 연결이 안될때는 재 요청을 해야하는데 이때 retry()를 사용합니다.

retry()는 실패했을때 호출이 됨


final int RETRY_MAX = 5;
final int RETRY_DELAY = 1000;

CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
        .map(OkHttpHelper::getT)
        .retry((retryCnt, e) -> {
            Log.e("retryCnt = " + retryCnt);
            CommonUtils.sleep(RETRY_DELAY);

            return retryCnt < RETRY_MAX;
        })
        .onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(data -> Log.it("result : " + data));
CommonUtils.exampleComplete();


//------------------------------------------
            
결과)

main | 719 | error = api.github.com: nodename nor servname provided, or not known
main | error = retryCnt = 1
main | 1724 | error = api.github.com
main | error = retryCnt = 2
main | 2729 | error = api.github.com
main | error = retryCnt = 3
main | 3734 | error = api.github.com
main | error = retryCnt = 4
main | 4739 | error = api.github.com
main | error = retryCnt = 5
main | 5742 | value = result : -500


retryUntil() 함수

조건이 만족할때까지 재시도 하는 함수입니다.


CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
        .map(OkHttpHelper::getT)
        .subscribeOn(Schedulers.io())
        .retryUntil(() -> {
            if(CommonUtils.isNetworkAvailable())
                return true; // 중지

            CommonUtils.sleep(1000);
            return false; // 계쏙 진행
        });

source.subscribe(Log::i);

CommonUtils.sleep(5000);


//------------------------------------------
            
결과)

RxCachedThreadScheduler-1 | 772 | error = api.github.com: nodename nor servname provided, or not known
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 1780 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 2784 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 3786 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available
RxCachedThreadScheduler-1 | 4790 | error = api.github.com
RxCachedThreadScheduler-1 | error = Network is not available


흐름 제어

흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 시간 차이가 발생할 때 사용하는 함수입니다.

  • sample() 함수 : 특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시
  • buffer() 함수 : 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용
  • throttleFirst() 함수 : 주어진 조건에서 가장 먼저 입력된 값을 발행
  • throttleLast() 함수 : 함수는 주어진 조건에서 가장 마지막 입력된 값을 발행
  • window() 함수 : groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듦
  • debounce() 함수 : 연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용

sample() 함수

특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시합니다.


String[] data = {"1", "7", "2", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data)
        .take(4)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[4])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행

Observable<String> source = Observable.concat(earlySource, lateSource)
        .sample(300L, TimeUnit.MILLISECONDS); // 300L 간격으로 가장 최근 데이터만 가져옴

source.subscribe(Log::it);

CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | 624 | value = 7 
RxComputationThreadPool-1 | 922 | value = 3



String[] data = {"1", "7", "2", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data)
        .take(4)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[4])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행

Observable<String> source = Observable.concat(earlySource, lateSource)
        .sample(300L, TimeUnit.MILLISECONDS, true); // true를 넣어주면 마지막 데이터는 가져옴 

source.subscribe(Log::it);

CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | 713 | value = 7
RxComputationThreadPool-1 | 1015 | value = 3
RxComputationThreadPool-3 | 1129 | value = 6


buffer() 함수

일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용합니다.


String[] data = {"1", "7", "2", "4", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data)
        .take(3)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> middleSource = Observable.fromArray(data[3])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[4], data[5])
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행

Observable<List<String>> source = Observable.concat(earlySource, middleSource, lateSource)
        .buffer(3); // 3개씩 모았다가 발행

source.subscribe(Log::it);

CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | 664 | value = [1, 7, 2]
RxComputationThreadPool-3 | 1167 | value = [4, 3, 6]



String[] data = {"1", "7", "2", "4", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data)
        .take(3)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> middleSource = Observable.fromArray(data[3])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[4], data[5])
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행

Observable<List<String>> source = Observable.concat(earlySource, middleSource, lateSource)
        .buffer(2, 3); // 3개씩 모았다가 2개를 모으고 1개를 스킵 

source.subscribe(Log::it);

CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | 627 | value = [1, 7]
RxComputationThreadPool-3 | 1131 | value = [4, 3]


throttleFirst(), throttleLast() 함수

throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행하고, throttleLast() 함수는 가장 마지막에 입력된 값을 발행합니다.


String[] data = {"1", "7", "2", "4", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data[0])
        .take(3)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> middleSource = Observable.fromArray(data[1])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[2], data[3], data[4], data[5])
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a) // 300L 마다 발행
        .doOnNext(Log::e); // 디버깅 정보 발행

Observable<String> source = Observable.concat(earlySource, middleSource, lateSource)
        .throttleFirst(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);

CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | 466 | value = 1
RxComputationThreadPool-3 | 773 | value = 7
RxComputationThreadPool-4 | error = 2
RxComputationThreadPool-4 | error = 4
RxComputationThreadPool-4 | 978 | value = 4
RxComputationThreadPool-4 | error = 3
RxComputationThreadPool-4 | error = 6


window() 함수

groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듭니다.


String[] data = {"1", "7", "2", "4", "3", "6"};

CommonUtils.exampleStart();

Observable<String> earlySource = Observable.fromArray(data)
        .take(3)
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> middleSource = Observable.fromArray(data[3])
        .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); // 100L 마다 발행

Observable<String> lateSource = Observable.fromArray(data[4], data[5])
        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); // 300L 마다 발행

Observable<Observable<String>> source = Observable.concat(earlySource, middleSource, lateSource)
        .window(3);

source.subscribe(observable -> {
    Log.d("new Observable Start");
    observable.subscribe(Log::it);
});

CommonUtils.sleep(1000);
CommonUtils.exampleComplete();


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | debug = new Observable Start
RxComputationThreadPool-1 | 508 | value = 1
RxComputationThreadPool-1 | 588 | value = 7
RxComputationThreadPool-1 | 690 | value = 2
RxComputationThreadPool-2 | debug = new Observable Start
RxComputationThreadPool-2 | 995 | value = 4
RxComputationThreadPool-3 | 1099 | value = 3
RxComputationThreadPool-3 | 1199 | value = 6


debounce() 함수

연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용합니다.


String[] data = {"1", "7", "2", "4"};

Observable<String> source = Observable.concat(
        Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[0]),
        Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[1]),
        Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[2]),
        Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[3])
).debounce(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::i);
CommonUtils.sleep(1000);


//------------------------------------------
            
결과)

RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 4