리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.
이 부분에서는 테스팅과 Flowable에 대한 부분을 포스팅합니다.
이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java
로 작성하였지만 이번에는 Kotlin
으로 예제를 바꿔서 작성하였습니다.
마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.
테스팅
이번에는 RxJava의 테스팅에 대해서 한번 다뤄보도록 하겠습니다. 어떻게 테스트 코드를 작성해야 하는지 어떤 테스트 관련 기능을 제공하는지 보도록 하겠습니다.
TestObserver 클랙스
RxJava에서 제공하는 테스트 클래스 입니다.
assertResult()
: 예상된 결과와 실제 결과를 비교하는 메서드입니다.assertFailure()
: Observable에서 기대했던 에러가 발생하는지 확인하는 코드입니다. 만약 기대했던 에러가 발생하지 않으면 테스트 코드 실행은 실패합니다.assertFailureAndMessage()
: 기대했던 에러 발생 시 에러 메시지까지 확인할 수 있습니다.assertComplete()
: Observable을 정상적으로 완료했는지 확인합니다.
assertResult()
예상된 결과와 실제 결과가 같은지 비교하는 메서드로 JUnit의 assertEquals() 와 동일합니다.
val data = listOf("1", "2-R", "3-T")
val source = Observable.fromIterable(data)
.map {
if (it == "") return@map "NO_SHAPE"
if (it.endsWith("-H")) return@map "HEXAGON"
if (it.endsWith("-O")) return@map "OCTAGON"
if (it.endsWith("-R")) return@map "RECTANGLE"
if (it.endsWith("-T")) return@map "TRIANGLE"
if (it.endsWith("<>")) return@map "DIAMOND"
if (it.endsWith("-P")) return@map "PENTAGON"
if (it.endsWith("-S")) return@map "STAR"
return@map "BALL"
}
val expected = arrayOf("BALL", "RECTANGLE", "TRIANGLE")
source.test().assertResult(*expected)
//------------------------------------------
결과)
테스트 통과
assertFailure()
Observable에서 기대했던 에러가 발생하는지 확인하는 코드입니다. 만약 기대했던 에러가 발생하지 않으면 테스트 코드 실행은 실패합니다.
val data = listOf("100", "200", "%300")
val source = Observable.fromIterable(data)
.map(Integer::parseInt)
source.test().assertFailure(NumberFormatException::class.java, 100, 200)
//------------------------------------------
결과)
테스트 통과
assertFailureAndMessage()
기대했던 에러 발생 시 에러 메시지까지 확인할 수 있습니다.
val data = listOf("100", "200", "%300")
val source = Observable.fromIterable(data)
.map(Integer::parseInt)
source.test().assertFailureAndMessage(NumberFormatException::class.java, "For input string: \"%100\"", 100, 200)
//------------------------------------------
결과)
java.lang.AssertionError: Error message differs; exptected: For input string: "%100" but was: For input string: "%300" (latch = 0, values = 2, errors = 1, completions = 0)
at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:189)
at io.reactivex.observers.BaseTestConsumer.assertErrorMessage(BaseTestConsumer.java:741)
at io.reactivex.observers.BaseTestConsumer.assertFailureAndMessage(BaseTestConsumer.java:845)
at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NumberFormatException: For input string: "%300"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:569)
at java.lang.Integer.parseInt(Integer.java:615)
at com.nkc.strack.rxjava.RxTest$test$source$1.invoke(RxTest.kt:34)
at com.nkc.strack.rxjava.RxTest$test$source$1.invoke(RxTest.kt:26)
at com.nkc.strack.rxjava.RxTest$sam$io_reactivex_functions_Function$0.apply(RxTest.kt)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12246)
at io.reactivex.Observable.test(Observable.java:15443)
... 23 more
assertComplete()
Observable을 정상적으로 종료했는지 알 수 있습니다.
val source = Observable.create { emitter: ObservableEmitter<String> ->
emitter.onNext("Hello World")
emitter.onComplete()
}
source.test().assertComplete()
//------------------------------------------
결과)
테스트 통과
비동기 코드 테스트
스케줄러를 이용해서 비동기 코드일때도 테스트를 하는 방법에 대해서 알아보도록 하겠습니다.
val source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.map(Long::toInt)
source.doOnNext(System.out::println).test().assertResult(0, 1, 2, 3, 4)
//------------------------------------------
결과)
java.lang.AssertionError: Value count differs; expected: 5 [0, 1, 2, 3, 4] but was: 0 [] (latch = 1, values = 0, errors = 0, completions = 0)
Expected :5 [0, 1, 2, 3, 4]
Actual :0 [] (latch = 1, values = 0, errors = 0, completions = 0)
<Click to see difference>
at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:189)
at io.reactivex.observers.BaseTestConsumer.assertValues(BaseTestConsumer.java:538)
at io.reactivex.observers.BaseTestConsumer.assertResult(BaseTestConsumer.java:796)
at com.nkc.strack.rxjava.RxTest.test(RxTest.kt:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
위와 같이 테스트를 진행하면 테스트가 실패하면서 아무것도 실행이 되지 않습니다. 그 이유는 Observable.interval() 이 main 스레드
가 아닌 계산 스케줄러
에서
실행이 되기 때문입니다.
만약 위 테스트와 같이 다른 스레드에서 실행이 되는 비동기 테스트를 해야할때에는 awaitDone() 함수
를 사용해서 함수가 실행되는 스레드에서 onComplete()
함수를 호출할 때까지 기다려야 합니다.
val source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.map(Long::toInt)
source.doOnNext(System.out::println).test().awaitDone(1L, TimeUnit.SECONDS).assertResult(0, 1, 2, 3, 4)
//------------------------------------------
결과)
0
1
2
3
4
Flowable 클래스
Flowable
클래스는 배압(=backpressure) 이슈
를 해결하기 위해 RxJava 2.x에서 도입된 개념입니다. Flowable
클래스의 도입 이유는 Observable
클래스의
성능을 향상시키기 위해서 입니다.
기존의 Observable 클래스는 배압에 관한 처리가 불필요한 경우에 초기 로딩 때문에 오버헤드가 있었는데 2.x 에서는 배압으로 인한 오버헤드가 사라졌습니다.
Observable
과 Flowable
의 가장 큰 차이는 Buffer의 유무입니다.
배압(=Backpressure) 이슈 : 데이터 처리속도가 데이터 발행속도를 따라가지 못하는 경우 오버플로우가 발생하는 이슈
기본적으로 Flowable 클래스의 활용은 Observable과 동일합니다.
Flowable.just("Hello World")
.subscribe(System.out::println)
//------------------------------------------
결과)
Hello World
아래 예제와 같이 subscribeOn() 과 observeOn() 함수도 동일하게 사용할 수 있습니다.
Flowable.fromCallable {
sleep(1000)
return@fromCallable "Done"}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace)
sleep(2000)
//------------------------------------------
결과)
Done
Observable과 Flowable 선택 기준
어느 상황에서 Observable을 사용하고 Flowable을 사용해야 할까요?
- Observable 사용해야 하는 경우
- 최대 1,000개 미만의 데이터 흐름, Out of Memory Exception 이 발생할 확률이 낮은 경우
- 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍
- 데이터 흐름이 본질적으로 동기 방식이지만 프로젝트에서 사용하는 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 때, Observable은 보통 Flowable과 비교했을 때 성능 오버헤드가 낮습니다.
- Flowable을 사용해야 하는 경우
- 특정 방식으로 생성된 10,000개 이상의 데이터를 처리해야 하는 경우
- 디스크에서 파일을 읽어 들일 경우
- JDBC를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우
- 네트워크 I/O를 실행하는 경우
- 다수의 블로킹 방식을 사용하거나 가져오는 방식의 데이터 소스가 미래에는 논 블로킹 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우
Flowable을 활용한 배압 이슈 대응
- onBackpressureBuffer() : 배압 이슈가 발생했을 때 별도의 버퍼에 저장합니다. Flowable 클래스는 기본적으로 128개의 버퍼가 있습니다.
- onBackpressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시합니다.
- onBackpressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지합니다.
아래는 강제로 배압 효과와 비슷한 효과가 발생하게한 코드입니다.
val subject = PublishSubject.create<Int>()
subject.observeOn(Schedulers.computation())
.subscribe {
sleep(100)
println(it)
}
for (i in 0..50_000_000) {
subject.onNext(i)
}
subject.onComplete()
//------------------------------------------
결과)
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
강제로 sleep(100) 을 해서 데이터가 발행하는 속도와 데이터를 처리하는 속도에 차이를 발생시켰습니다. 그리고 데이터는 13개만 처리가 되었습니다. 이렇게 데이터 발행 속도와 처리 속도의 차이때문에 데이터 처리가 잘 안되는 경우에 Flowable 클래스를 활용할 수 있습니다.
일단 첫번째 전략은 onBackpressureBuffer()
함수를 이용하여 버퍼를 만드는 방법
입니다.
아래에서는 Flowable을 이용해 128개의 버퍼를 이용해 처리하였습니다. 바로 위에 예제보다 훨씬 빠르게 다운스트림이 발생하지만 128개의 버퍼
로는
모두 대응하기 힘들기 때문에, 오버플로우가 발생합니다. 여기서는 BackpressureOverflowStrategy.DROP_OLDEST
를 이용해서 오버플로우가 발생하면
버퍼에 쌓여 있는 가장 오래된 값을 버리도록 설정하였습니다. 옵션은 크게 아래와 같이 3개가 있습니다.
ERROR
: 오버플로우가 발생했을 때, 예외를 던지고 데이터 흐름을 중단합니다.DROP_LATEST
: 오버플로우가 발생했을 때, 버퍼에 있는 가장 최근 값을 제거합니다.DROP_OLDEST
: 오버플로우가 발생했을 때, 버퍼에 있는 가장 오래된 값을 제거합니다.
Flowable.range(1, 50_000_000)
.onBackpressureBuffer(128, { print("overflow") }, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe {
sleep(100)
println(it)
}
//------------------------------------------
결과)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
overflow
overflow
overflow
...
두번째 전략은 onBackpressureDrop()
으로 버퍼가 가득 차면 그 이후에 데이터를 무시하는 방법이 있습니다.
128개의 버퍼가 가득 찰때까지 진행을 하고 그 이후에는 데이터 발행을 멈춥니다.
Flowable.range(1, 50_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe {
sleep(100)
println(it)
}
sleep(20_000)
//------------------------------------------
결과)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
128
마지막 전략은 onBackpressureLatest()
함수를 활용하여 처리할 수 없는 데이터는 무시하고 최신 데이터만 유지합니다.
Flowable.range(1, 50_000_000)
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe {
sleep(100)
println(it)
}
sleep(20_000)
//------------------------------------------
결과)
1
...
117
118
119
120
121
122
123
124
125
126
127
128
50000000