리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.
이 부분에서는 디버깅과 예외처리 및 흐름 제어에 대한 부분을 포스팅합니다.
이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java
로 작성하였지만 이번에는 Kotlin
으로 예제를 바꿔서 작성하였습니다.
마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.
디버깅
RxJava는 try-catch문을 사용할 수 없고 로그를 넣을 수 있는 공간이 없기 때문에, doOnXXX() 계열의 함수를 이용하여 강제로 부수 효과를 일으켜 버그를 잡아야 합니다.
doOnXXX 계열의 함수들
강제로 부수효과를 일으켜 버그를 알 수 있게 해줍니다.
doOnNext()
: 어떤 데이터가 발행되면 이벤트 발생doOnComplete()
: 모든 데이터가 발행되면 이벤트 발생doOnError()
: 중간에 에러가 발생하면 이벤트 발생doOnEach()
: onNext, onComplete, onError 이벤트를 한번에 처리doOnSubscribe()
: Observable을 구독했을때, 호출doOnDispose()
: Observable의 구독이 해지되었을때 호출
doOnNext() 함수, doOnComplete() 함수, doOnError() 함수
- doOnNext() 함수 : 어떤 데이터가 발행되면 이벤트 발생
- doOnComplete() 함수 : 모든 데이터가 발행되면 이벤트 발생
- doOnError() 함수 : 중간에 에러가 발생하면 이벤트 발생
val args = listOf(10, 5, 0)
Observable.fromIterable(args)
.map { div -> 1000 / div }
.doOnNext { data -> println("onNext() = " + data!!) }
.doOnComplete { println("onComplete") }
.doOnError { e -> println("onError() = " + e.message) }
.subscribe(System.out::println)
//------------------------------------------
결과)
onNext() = 100
100
onNext() = 200
200
onError() = / by zero
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.ArithmeticException: / by zero
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.Observable.subscribe(Observable.java:12232)
at io.reactivex.Observable.subscribe(Observable.java:12134)
at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ArithmeticException: / by zero
at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:27)
at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:19)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
... 36 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.ArithmeticException: / by zero
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.Observable.subscribe(Observable.java:12232)
at io.reactivex.Observable.subscribe(Observable.java:12134)
at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ArithmeticException: / by zero
at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:27)
at com.nkc.strack.rxjava.RxTest$test$1.apply(RxTest.kt:19)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
... 36 more
doOnEach() 함수
onNext, onComplete, onError 이벤트를 한번에 처리할 수 있습니다.
val data = listOf("ONE", "TWO", "THREE")
Observable.fromIterable(data).doOnEach {
if (it.isOnNext) println("onNext() = " + it.value)
if (it.isOnComplete) println("onComplete()")
if (it.isOnError) println("onError() = " + it.error?.message)
}.subscribe(System.out::println)
//------------------------------------------
결과)
onNext() = ONE
ONE
onNext() = TWO
TWO
onNext() = THREE
THREE
onComplete()
doOnSubscribe() 함수, doOnDispose() 함수
doOnSubscribe()
: Observable을 구독했을때, 호출doOnDispose()
: Observable의 구독이 해지되었을때 호출
val args = listOf("1", "3", "5", "2", "6")
val source = Observable.fromIterable(args)
.zipWith<Long, String>(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data })
.doOnSubscribe { println("onSubscribe()") }
.doOnDispose { println("onDispose()") }
val disposable = source.subscribe(System.out::println)
sleep(200)
disposable.dispose()
sleep(300)
//------------------------------------------
결과)
onSubscribe()
1
3
onDispose()
doOnLifeCycle() 함수
doOnSubscribe()와 doOnDispose() 함수를 한번에 호출하는 함수
val list = listOf("1", "3", "5", "2", "6")
val source = Observable.fromIterable(list)
.zipWith<Long, String>(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction {data, _ -> data})
.doOnLifecycle({ println("onSubscribe()") }, { println("onDispose()") })
val disposable = source.subscribe(System.out::println)
sleep(200)
disposable.dispose()
sleep(300)
//------------------------------------------
결과)
onSubscribe()
1
3
onDispose()
doOnTerminate() 함수
Observable이 끝나는 onComplete 또는 onError 이벤트가 발생했을 때 실행되는 함수입니다.
val list = listOf("1", "3", "5")
Observable.fromIterable(list)
.doOnTerminate { println("onTerminate()") }
.doOnComplete { println("onComplete()") }
.doOnError { e -> println("error : ${e.message}") }
.subscribe(System.out::println)
//------------------------------------------
결과)
1
3
5
onTerminate()
onComplete()
예외 처리
RxJava에서는 try-catch문을 이용하여 예외처리를 할 수 없습니다, 그래서 기존과 다른 방식을 이용해야 합니다.
onErrorReturn() 함수
RxJava에서는 에러도 어떠한 데이터로 보는것이 적절합니다. 그렇기 때문 예외가 발생하면, 에러를 의미하는 다른 데이터로 대체를 합니다.
val grades = listOf("70", "88", "$100", "93", "83")
val source = Observable.fromIterable(grades)
.map { data -> Integer.parseInt(data) }
.onErrorReturn { e ->
if (e is NumberFormatException) {
e.printStackTrace()
}
-1
}
source.subscribe { data ->
if (data < 0) {
println("Wrong Data found!!")
return@subscribe
}
println("Grade is " + data!!)
}
//------------------------------------------
결과)
Grade is 70
Grade is 88
java.lang.NumberFormatException: For input string: "$100"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:569)
at java.lang.Integer.parseInt(Integer.java:615)
at com.nkc.strack.rxjava.RxTest$test$source$1.apply(RxTest.kt:29)
at com.nkc.strack.rxjava.RxTest$test$source$1.apply(RxTest.kt:21)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableOnErrorReturn.subscribeActual(ObservableOnErrorReturn.java:31)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.Observable.subscribe(Observable.java:12232)
at io.reactivex.Observable.subscribe(Observable.java:12134)
at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Wrong Data found!!
onError와 onErrorReturn()의 차이점
- 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있습니다.
- Observable을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 그에 맞게 해결이 될 수 있습니다.
onErrorReturnItem() 함수
onErrorReturn() 함수와 비슷하지만 중간에 에러를 넘기는 부분을 구현안해도 되기 때문에 좀 더 간결해집니다.
val grades = listOf("70", "88", "$100", "93", "83")
val source = Observable.fromIterable(grades)
.map { data -> Integer.parseInt(data) }
.onErrorReturnItem(-1)
source.subscribe { data ->
if (data < 0) {
println("Wrong Data found!!")
return@subscribe
}
println("Grade is " + data!!)
}
//------------------------------------------
결과)
Grade is 70
Grade is 88
Wrong Data found!!
onErrorResumeNext() 함수
onErrorResumeNext()는 에러가 발생했을 때 내가 원하는 Observable로 대체합니다.
이용할 수 있는 케이스로는 에러 발생시 기본값을 준다던가, 관리자에게 알람을 준다든가, 자원을 해제하는 등의 추가 작업을 할 수 있습니다.
val salesData = listOf("70", "88", "A300")
val onParseError = Observable.defer {
println("Send mail!!")
Observable.just(-1)
}.subscribeOn(Schedulers.io())
val source = Observable.fromIterable(salesData)
.map{ Integer.parseInt(it) }
.onErrorResumeNext(onParseError)
source.subscribe { data ->
if (data < 0) {
println("Wrong data found!!")
return@subscribe
}
println("Sales Data : " + data!!)
}
//------------------------------------------
결과)
Sales Data : 70
Sales Data : 88
Send mail!!
Wrong data found!!
retry() 함수
서버 연결과 같이 인터넷이 일시적으로 연결이 안될때는 재 요청을 해야하는데 이때 retry()를 사용합니다.
retry()는 실패했을때 호출이 됨
val RETRY_MAX = 5
val RETRY_DELAY = 1000L
val ERROR_CODE = "-500"
val url = "https://api.github.com/zen"
val source = Observable.just(url)
.map {
val request = Request.Builder().url(it).build()
return@map OkHttpClient().newCall(request).execute().body()?.string()
}
.retry { retryCount, e ->
println("retryCount : $retryCount")
sleep(RETRY_DELAY)
return@retry retryCount < RETRY_MAX
}
.onErrorReturn { ERROR_CODE }
source.subscribe(System.out::println)
//------------------------------------------
결과)
retryCount : 1
retryCount : 2
retryCount : 3
retryCount : 4
retryCount : 5
-500
retryUntil() 함수
조건이 만족할때까지 재시도 하는 함수입니다.
val ERROR_CODE = "-500"
val url = "https://api.github.com/zen"
val source = Observable.just(url)
.map {
val request = Request.Builder().url(it).build()
return@map OkHttpClient().newCall(request).execute().body()?.string()
}
.subscribeOn(Schedulers.io())
.retryUntil {
if (isNetworkAvailable())
return@retryUntil true // true를 만나면 그만
sleep(1000)
return@retryUntil false // false를 만나면 다시 시도
}
.onErrorReturn { ERROR_CODE }
source.subscribe(System.out::println)
sleep(5000)
// 네트워크 체크 하는 함수
private fun isNetworkAvailable(): Boolean {
try {
return InetAddress.getByName("www.google.com").isReachable(1000)
} catch (e: IOException) {
println("Network is not available")
}
return false
}
//------------------------------------------
결과)
Network is not available
Network is not available
Network is not available
Network is not available
Network is not available
retryWhen() 함수
재시도 조건을 동적으로 설정해야 하는 복잡한 로직을 구현할 때 활용합니다.
// 세 번의 재시도를 하며, 재시도 횟수가 늘어날때마다 1000ms씩 재시도 시간이 늘어납니다.
Observable.create { emitter: ObservableEmitter<String> ->
emitter.onError(RuntimeException("always fails"))
}.retryWhen { attempts ->
return@retryWhen attempts.zipWith(
range(1, 3),
BiFunction<Throwable, Int, Int> {_, i -> i})
.flatMap { i ->
println("delay retry by $i seconds")
return@flatMap timer(i.toLong(), TimeUnit.SECONDS)
}
}.blockingForEach(System.out::println)
//------------------------------------------
결과)
delay retry by 1 seconds
delay retry by 2 seconds
delay retry by 3 seconds
흐름 제어
흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 시간 차이가 발생할 때 사용하는 함수입니다.
sample()
: 특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시buffer()
: 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용throttleFirst()
: 주어진 조건에서 가장 먼저 입력된 값을 발행throttleLast()
: 함수는 주어진 조건에서 가장 마지막 입력된 값을 발행window()
: groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듦debounce()
: 연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용
–
sample() 함수
특정 시간 동안 여러 데이터가 들어왔을때 마지막 데이터만 발행하고 나머지는 무시합니다.
val list = listOf("1", "7", "2", "3", "6")
val earlySource = Observable.fromIterable(list)
.take(4)
.zipWith<Long, String>(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val lateSource = Observable.fromArray(list[4])
.zipWith<Long, String>(
Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
Observable.concat<String>(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS) // 300L 간격으로 가장 최근 데이터만 가져옴
.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
7
3
val list = listOf("1", "7", "2", "3", "6")
val earlySource = Observable.fromIterable(list)
.take(4)
.zipWith<Long, String>(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val lateSource = Observable.fromArray(list[4])
.zipWith<Long, String>(
Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
Observable.concat<String>(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS, true) // true를 넣어주면 마지막 데이터는 가져옴
.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
7
3
6
buffer() 함수
일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해주기 때문에, 넘치는 데이터 흐름을 제어할때 사용합니다.
val list = arrayOf("1", "7", "2", "4", "3", "6")
val earlySource = Observable.fromArray(*list)
.take(3)
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val middleSource = Observable.fromArray(list[3])
.zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
val lateSource = Observable.fromArray(list[4], list[5])
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val source = Observable.concat<String>(earlySource, middleSource, lateSource)
.buffer(3) // 3개씩 모았다가 발행
source.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
[1, 7, 2]
[4, 3, 6]
val list = arrayOf("1", "7", "2", "4", "3", "6")
val earlySource = Observable.fromArray(*list)
.take(3)
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val middleSource = Observable.fromArray(list[3])
.zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
val lateSource = Observable.fromArray(list[4], list[5])
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val source = Observable.concat<String>(earlySource, middleSource, lateSource)
.buffer(2, 3) // 3개씩 모았다가 2개를 발행하고 1개는 스킵
source.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
[1, 7]
[4, 3]
throttleFirst(), throttleLast() 함수
throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행하고, throttleLast() 함수는 가장 마지막에 입력된 값을 발행합니다.
val list = arrayOf("1", "7", "2", "4", "3", "6")
val earlySource = Observable.fromArray(*list)
.take(3)
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val middleSource = Observable.fromArray(list[3])
.zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
val lateSource = Observable.fromArray(list[4], list[5])
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
.doOnNext { println("error : $it") }
val source = Observable.concat<String>(earlySource, middleSource, lateSource)
.throttleFirst(200L, TimeUnit.MILLISECONDS)
source.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
1
4
error : 3
error : 6
6
window() 함수
groupBy() 함수 같이 특정 조건에 맞는 입력값들을 그룹화 해서 별도의 Observable을 병렬로 만듭니다.
val list = arrayOf("1", "7", "2", "4", "3", "6")
val earlySource = Observable.fromArray(*list)
.take(3)
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val middleSource = Observable.fromArray(list[3])
.zipWith<Long, String>(Observable.interval(300L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 300L 마다 발행
val lateSource = Observable.fromArray(list[4], list[5])
.zipWith<Long, String>(Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction { data, _ -> data }) // 100L 마다 발행
val source = Observable.concat<String>(earlySource, middleSource, lateSource)
.window(3)
source.subscribe { observeable ->
println("new Observable Start")
observeable.subscribe(System.out::println)
}
sleep(1000)
//------------------------------------------
결과)
RxComputationThreadPool-1 | debug = new Observable Start
RxComputationThreadPool-1 | 508 | value = 1
RxComputationThreadPool-1 | 588 | value = 7
RxComputationThreadPool-1 | 690 | value = 2
RxComputationThreadPool-2 | debug = new Observable Start
RxComputationThreadPool-2 | 995 | value = 4
RxComputationThreadPool-3 | 1099 | value = 3
RxComputationThreadPool-3 | 1199 | value = 6
debounce() 함수
연속 이벤트를 처리하는 함수로, 예를 들어 빠르게 버튼을 누를때 마지막 누른 버튼만 처리해야할 때 사용합니다.
val data = arrayOf("1", "7", "2", "4")
val source = Observable.concat(
Observable.timer(100L, TimeUnit.MILLISECONDS).map { data[0] },
Observable.timer(300L, TimeUnit.MILLISECONDS).map { data[1] },
Observable.timer(100L, TimeUnit.MILLISECONDS).map { data[2] },
Observable.timer(300L, TimeUnit.MILLISECONDS).map { data[3] }
).debounce(200L, TimeUnit.MILLISECONDS)
source.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
1
2
4