리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.

이 부분에서는 디버깅과 예외처리 및 흐름 제어에 대한 부분을 포스팅합니다.

이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java로 작성하였지만 이번에는 Kotlin으로 예제를 바꿔서 작성하였습니다.

마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.


디버깅

RxJava는 try-catch문을 사용할 수 없고 로그를 넣을 수 있는 공간이 없기 때문에, doOnXXX() 계열의 함수를 이용하여 강제로 부수 효과를 일으켜 버그를 잡아야 합니다.


doOnXXX 계열의 함수들

강제로 부수효과를 일으켜 버그를 알 수 있게 해줍니다.

  • doOnNext() : 어떤 데이터가 발행되면 이벤트 발생
  • doOnComplete() : 모든 데이터가 발행되면 이벤트 발생
  • doOnError() : 중간에 에러가 발생하면 이벤트 발생
  • doOnEach() : onNext, onComplete, onError 이벤트를 한번에 처리
  • doOnSubscribe() : Observable을 구독했을때, 호출
  • doOnDispose() : Observable의 구독이 해지되었을때 호출

doOnNext() 함수, doOnComplete() 함수, doOnError() 함수

  • doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
  • doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
  • doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생


val args = listOf(10, 5, 0)

Observable.fromIterable(args)
    .map { div -> 1000 / div }
    .doOnNext { data -> println("onNext() = " + data!!) }
    .doOnComplete { println("onComplete") }
    .doOnError { e -> println("onError() = " + e.message) }
    .subscribe(System.out::println)
        

//------------------------------------------
            
결과)

onNext() = 100
100
onNext() = 200
200
onError() = / by zero
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.ArithmeticException: / by zero
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
	at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
	at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)

	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.Observable.subscribe(Observable.java:12232)
	at io.reactivex.Observable.subscribe(Observable.java:12134)
	at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:31)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ArithmeticException: / by zero
	at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:27)
	at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:19)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
	... 36 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.ArithmeticException: / by zero
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
	at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
	at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
	at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.Observable.subscribe(Observable.java:12232)
	at io.reactivex.Observable.subscribe(Observable.java:12134)
	at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:31)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ArithmeticException: / by zero
	at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:27)
	at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:19)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
	... 36 more
        

doOnEach() 함수

onNext, onComplete, onError 이벤트를 한번에 처리할 수 있습니다.


val data = listOf("ONE", "TWO", "THREE")

Observable.fromIterable(data).doOnEach {
    if (it.isOnNext) println("onNext() = " + it.value)
    if (it.isOnComplete) println("onComplete()")
    if (it.isOnError) println("onError() = " + it.error?.message)
}.subscribe(System.out::println)


//------------------------------------------
            
결과)

onNext() = ONE
ONE
onNext() = TWO
TWO
onNext() = THREE
THREE
onComplete()


doOnSubscribe() 함수, doOnDispose() 함수

  • doOnSubscribe() : Observable을 구독했을때, 호출
  • doOnDispose() : Observable의 구독이 해지되었을때 호출


val args = listOf("1", "3", "5", "2", "6")
val source = Observable.fromIterable(args)
    .zipWith<Long, String>(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })
    .doOnSubscribe { println("onSubscribe()") }
    .doOnDispose { println("onDispose()") }

val disposable = source.subscribe(System.out::println)

sleep(200)
disposable.dispose()
sleep(300)


//------------------------------------------
            
결과)

onSubscribe()
1
3
onDispose()
        

doOnLifeCycle() 함수

doOnSubscribe()와 doOnDispose() 함수를 한번에 호출하는 함수


val list = listOf("1", "3", "5", "2", "6")
val source = Observable.fromIterable(list)
    .zipWith<Long, String>(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction {data, _ -> data})
    .doOnLifecycle({ println("onSubscribe()") }, { println("onDispose()") })

val disposable = source.subscribe(System.out::println)

sleep(200)
disposable.dispose()
sleep(300)


//------------------------------------------
            
결과)

onSubscribe()
1
3
onDispose()


doOnTerminate() 함수

Observable이 끝나는 onComplete 또는 onError 이벤트가 발생했을 때 실행되는 함수입니다.


val list = listOf("1", "3", "5")

Observable.fromIterable(list)
    .doOnTerminate { println("onTerminate()") }
    .doOnComplete { println("onComplete()") }
    .doOnError { e -> println("error : ${e.message}") }
    .subscribe(System.out::println)

//------------------------------------------
            
결과)

1
3
5
onTerminate()
onComplete()

예외 처리

RxJava에서는 try-catch문을 이용하여 예외처리를 할 수 없습니다, 그래서 기존과 다른 방식을 이용해야 합니다.

onErrorReturn() 함수

RxJava에서는 에러도 어떠한 데이터로 보는것이 적절합니다. 그렇기 때문 예외가 발생하면, 에러를 의미하는 다른 데이터로 대체를 합니다.


val grades = listOf("70", "88", "$100", "93", "83")

val source = Observable.fromIterable(grades)
    .map { data -> Integer.parseInt(data) }
    .onErrorReturn { e ->
        if (e is NumberFormatException) {
            e.printStackTrace()
        }
        -1
    }

source.subscribe { data ->
    if (data < 0) {
        println("Wrong Data found!!")
        return@subscribe
    }

    println("Grade is " + data!!)
}

      
//------------------------------------------
            
결과)

Grade is 70
Grade is 88
java.lang.NumberFormatException: For input string: "$100"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:569)
	at java.lang.Integer.parseInt(Integer.java:615)
	at com.nkc.strack.rxjava.RxTest$test$source$1.apply(RxTest.kt:29)
	at com.nkc.strack.rxjava.RxTest$test$source$1.apply(RxTest.kt:21)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
	at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
	at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.internal.operators.observable.ObservableOnErrorReturn.subscribeActual(ObservableOnErrorReturn.java:31)
	at io.reactivex.Observable.subscribe(Observable.java:12246)
	at io.reactivex.Observable.subscribe(Observable.java:12232)
	at io.reactivex.Observable.subscribe(Observable.java:12134)
	at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:37)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Wrong Data found!!
  

onError와 onErrorReturn()의 차이점

  1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있습니다.
  2. Observable을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 그에 맞게 해결이 될 수 있습니다.

onErrorReturnItem() 함수

onErrorReturn() 함수와 비슷하지만 중간에 에러를 넘기는 부분을 구현안해도 되기 때문에 좀 더 간결해집니다.


val grades = listOf("70", "88", "$100", "93", "83")

val source = Observable.fromIterable(grades)
    .map { data -> Integer.parseInt(data) }
    .onErrorReturnItem(-1)

source.subscribe { data ->
    if (data < 0) {
        println("Wrong Data found!!")
        return@subscribe
    }
    println("Grade is " + data!!)
}

      
//------------------------------------------
            
결과)

Grade is 70
Grade is 88
Wrong Data found!!

onErrorResumeNext() 함수

onErrorResumeNext()는 에러가 발생했을 때 내가 원하는 Observable로 대체합니다.

이용할 수 있는 케이스로는 에러 발생시 기본값을 준다던가, 관리자에게 알람을 준다든가, 자원을 해제하는 등의 추가 작업을 할 수 있습니다.


val salesData = listOf("70", "88", "A300")

val onParseError = Observable.defer {
    println("Send mail!!")
    Observable.just(-1)
}.subscribeOn(Schedulers.io())

val source = Observable.fromIterable(salesData)
    .map{ Integer.parseInt(it) }
    .onErrorResumeNext(onParseError)

source.subscribe { data ->
    if (data < 0) {
        println("Wrong data found!!")
        return@subscribe
    }

    println("Sales Data : " + data!!)
}

//------------------------------------------
            
결과)

Sales Data : 70
Sales Data : 88
Send mail!!
Wrong data found!!


retry() 함수

서버 연결과 같이 인터넷이 일시적으로 연결이 안될때는 재 요청을 해야하는데 이때 retry()를 사용합니다.

retry()는 실패했을때 호출이 됨


val RETRY_MAX = 5
val RETRY_DELAY = 1000L
val ERROR_CODE = "-500"

val url = "https://api.github.com/zen"

val source = Observable.just(url)
    .map {
        val request = Request.Builder().url(it).build()
        return@map OkHttpClient().newCall(request).execute().body()?.string()
    }
    .retry { retryCount, e ->
        println("retryCount : $retryCount")
        sleep(RETRY_DELAY)
        return@retry retryCount < RETRY_MAX
    }
    .onErrorReturn { ERROR_CODE }


source.subscribe(System.out::println)


//------------------------------------------
            
결과)

retryCount : 1
retryCount : 2
retryCount : 3
retryCount : 4
retryCount : 5
-500


retryUntil() 함수

조건이 만족할때까지 재시도 하는 함수입니다.


val ERROR_CODE = "-500"

val url = "https://api.github.com/zen"

val source = Observable.just(url)
    .map {
        val request = Request.Builder().url(it).build()
        return@map OkHttpClient().newCall(request).execute().body()?.string()
    }
    .subscribeOn(Schedulers.io())
    .retryUntil {
        if (isNetworkAvailable())
            return@retryUntil true // true를 만나면 그만

        sleep(1000)
        return@retryUntil false // false를 만나면 다시 시도

    }
    .onErrorReturn { ERROR_CODE }

source.subscribe(System.out::println)

sleep(5000)


// 네트워크 체크 하는 함수
private fun isNetworkAvailable(): Boolean {
    try {
        return InetAddress.getByName("www.google.com").isReachable(1000)
    } catch (e: IOException) {
        println("Network is not available")
    }

    return false
}


//------------------------------------------
            
결과)

Network is not available
Network is not available
Network is not available
Network is not available
Network is not available

retryWhen() 함수

재시도 조건을 동적으로 설정해야 하는 복잡한 로직을 구현할 때 활용합니다.


// 세 번의 재시도를 하며, 재시도 횟수가 늘어날때마다 1000ms씩 재시도 시간이 늘어납니다.

Observable.create { emitter: ObservableEmitter<String> ->
    emitter.onError(RuntimeException("always fails"))
}.retryWhen { attempts ->
    return@retryWhen attempts.zipWith(
        range(1, 3),
        BiFunction<Throwable, Int, Int> {_, i -> i})
        .flatMap { i ->
            println("delay retry by $i seconds")
            return@flatMap timer(i.toLong(), TimeUnit.SECONDS)
        }
}.blockingForEach(System.out::println)


//------------------------------------------
            
결과)

delay retry by 1 seconds
delay retry by 2 seconds
delay retry by 3 seconds


흐름 제어

흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 시간 차이가 발생할 때 사용하는 함수입니다.

  • sample() : 특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시
  • buffer() : 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용
  • throttleFirst() : 주어진 조건에서 가장 먼저 입력된 값을 발행
  • throttleLast() : 함수는 주어진 조건에서 가장 마지막 입력된 값을 발행
  • window() : groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듦
  • debounce() : 연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용

sample() 함수

특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시합니다.


val list = listOf("1", "7", "2", "3", "6")


val earlySource = Observable.fromIterable(list)
    .take(4)
    .zipWith<Long, String>(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val lateSource = Observable.fromArray(list[4])
    .zipWith<Long, String>(
        Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 300L 마다 발행

Observable.concat<String>(earlySource, lateSource)
    .sample(300L, TimeUnit.MILLISECONDS) // 300L 간격으로 가장 최근 데이터만 가져옴
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

7
3



val list = listOf("1", "7", "2", "3", "6")

val earlySource = Observable.fromIterable(list)
    .take(4)
    .zipWith<Long, String>(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val lateSource = Observable.fromArray(list[4])
    .zipWith<Long, String>(
        Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 300L 마다 발행

Observable.concat<String>(earlySource, lateSource)
    .sample(300L, TimeUnit.MILLISECONDS, true) // true를 넣어주면 마지막 데이터는 가져옴
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

7
3
6


buffer() 함수

일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용합니다.


val list = arrayOf("1", "7", "2", "4", "3", "6")

val earlySource = Observable.fromArray(*list)
    .take(3)
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val middleSource = Observable.fromArray(list[3])
    .zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 300L 마다 발행

val lateSource = Observable.fromArray(list[4], list[5])
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 100L 마다 발행

val source = Observable.concat<String>(earlySource, middleSource, lateSource)
    .buffer(3) // 3개씩 모았다가 발행

source.subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

[1, 7, 2]
[4, 3, 6]



val list = arrayOf("1", "7", "2", "4", "3", "6")


val earlySource = Observable.fromArray(*list)
    .take(3)
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val middleSource = Observable.fromArray(list[3])
    .zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 300L 마다 발행

val lateSource = Observable.fromArray(list[4], list[5])
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 100L 마다 발행

val source = Observable.concat<String>(earlySource, middleSource, lateSource)
    .buffer(2, 3) // 3개씩 모았다가 2개를 발행하고 1개는 스킵

source.subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

[1, 7]
[4, 3]


throttleFirst(), throttleLast() 함수

throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행하고, throttleLast() 함수는 가장 마지막에 입력된 값을 발행합니다.


val list = arrayOf("1", "7", "2", "4", "3", "6")

val earlySource = Observable.fromArray(*list)
    .take(3)
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val middleSource = Observable.fromArray(list[3])
    .zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 300L 마다 발행

val lateSource = Observable.fromArray(list[4], list[5])
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 100L 마다 발행
    .doOnNext { println("error : $it") }

val source = Observable.concat<String>(earlySource, middleSource, lateSource)
    .throttleFirst(200L, TimeUnit.MILLISECONDS)

source.subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

1
4
error : 3
error : 6
6


window() 함수

groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듭니다.


val list = arrayOf("1", "7", "2", "4", "3", "6")

val earlySource = Observable.fromArray(*list)
    .take(3)
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data }) // 100L 마다 발행

val middleSource = Observable.fromArray(list[3])
    .zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 300L 마다 발행

val lateSource = Observable.fromArray(list[4], list[5])
    .zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction { data, _ -> data })  // 100L 마다 발행

val source = Observable.concat<String>(earlySource, middleSource, lateSource)
    .window(3)

source.subscribe { observeable ->
    println("new Observable Start")
    observeable.subscribe(System.out::println)
}

sleep(1000)


//------------------------------------------
            
결과)

RxComputationThreadPool-1 | debug = new Observable Start
RxComputationThreadPool-1 | 508 | value = 1
RxComputationThreadPool-1 | 588 | value = 7
RxComputationThreadPool-1 | 690 | value = 2
RxComputationThreadPool-2 | debug = new Observable Start
RxComputationThreadPool-2 | 995 | value = 4
RxComputationThreadPool-3 | 1099 | value = 3
RxComputationThreadPool-3 | 1199 | value = 6


debounce() 함수

연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용합니다.


val data = arrayOf("1", "7", "2", "4")

val source = Observable.concat(
    Observable.timer(100L, TimeUnit.MILLISECONDS).map { data[0] },
    Observable.timer(300L, TimeUnit.MILLISECONDS).map { data[1] },
    Observable.timer(100L, TimeUnit.MILLISECONDS).map { data[2] },
    Observable.timer(300L, TimeUnit.MILLISECONDS).map { data[3] }
).debounce(200L, TimeUnit.MILLISECONDS)

source.subscribe(System.out::println)
sleep(1000)


//------------------------------------------
            
결과)

1
2
4