리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 포스팅 하려고 합니다.
이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java
로 작성하였지만 이번에는 Kotlin
으로 예제를 바꿔서 작성하였습니다.
마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.
Gradle에 libarary 추가하기
다음과 같이 implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
를 dependencies에 넣어줍니다.
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
}
부수효과 (Side Effect)
함수에서 출력(리턴)이 아닌, 외부의 상태에 영향을 미치는 부수적인 효과
Observable
Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 합니다.
RxJava는 여러 Observable로 데이터 흐름을 만들어 원하는 데이터를 만들어내는게 핵심입니다.
RxJava2에서는 상황에 맞게 Observable
, Maybe
, Flowable
로 구분해 사용을 합니다.
Observable
:Observable
은 옵서버 패턴을 구현하여 객체의 상태 변화를 관찰하여 상태 변화가 있을때마다 상태 변화에 대해 옵서버에게 알려줍니다.Maybe
: reduce() 함수나 firstElement() 함수와 같이 데이터가 발행할 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미합니다.Flowable
: 데이터가 발생하는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back Pressure) 이슈에 대응하는 기능을 제공합니다.
Observable은 아래 세 가지의 알림을 구독자에게 전달합니다.
onNext
: 데이터의 발행을 알립니다.onComplete
: 모든 데이터가 완료되었음을 알립니다. (onComplete
이후에는onNext
가 발생해선 안됩니다)onError
: Observable에서 에러가 발생했을 알립니다, 이 경우에는 더 이상onNext
와onComplete
을 발생하지 않고 실행을 종료합니다.
just() 함수
인자로 넣은 데이터를 차례로 발행하려고 Observable
을 생성합니다.
(단, 타입은 모두 같아야 하고
여러개의 최대 넣을 수 있는 데이터의 개수는 10개
입니다.)
Observable.just(1, 2, 3, 4, 5)
.subscribe(System.out::println)
//------------------------------------------
결과)
1
2
3
4
5
subscribe()
subscribe는 실제 데이터를 발행하게끔 하는 함수로 Observable은 just()같은 함수로 데이터 흐름을 정의만 하고, 실제 데이터가 발행이되는건,
subscribe가 호출된 이후
입니다.
인자의 개수별로 subscribe는 달라집니다.
인자 0개
:onNext
와onComplete
이벤트를 무시하고onError
이벤트가 발생했을 때만onErrorNotImplementedException
을 던집니다.인자 1개
:onNext
를 처리합니다.인자 2개
:onNext
와onError
이벤트를 처리합니다.인자 3개
:onNext
와onError
,onComplete
이벤트를 처리합니다.
dispose()
dispose()는 Observable에게 더 이상 데이터를 발행하지 말라고 구독을 해지하는 함수입니다. 일반적으로는 onComplete 이벤트가 정상적으로 발생하면 구독자가 별도의 dispose()를 호출하지 않아도 됩니다.
create() 함수
just()
함수는 데이터를 넣으면 자동으로 이벤트가 발생하지만, create()
함수는 onNext
, onComplete
, onError
같은 이벤트를
직접 호출해야 합니다.
RxJava의 javadoc에 다르면 create()는 RxJava에 익숙한 사용자만 활용하도록 권고합니다. 이유는 개발자가 하나하나 수동으로 잡아줘야 하는 옵션들이 많아지기 때문입니다.
Observable.create { emitter: ObservableEmitter<Int> ->
emitter.onNext(100)
emitter.onNext(200)
emitter.onNext(300)
emitter.onComplete()
}.subscribe { println(it) }
//------------------------------------------
결과)
100
200
300
fromArray() 함수
Array안의 데이터를 하나씩 발행하는 함수
val arr = arrayOf(100, 200, 300)
val source = Observable.fromArray(*arr)
source.subscribe { println(it) }
//------------------------------------------
결과)
100
200
300
** arr
이 아닌 *arr
을 사용한 이유는 정수 배열은 명시적 래퍼 타임인 Integer
이 아닌 int
로 코딩을 하기 때문에 값을 호출하기 위해 *
를 붙입니다.
fromIterable() 함수
Iterable 인터페이스의 값을 가져오는 함수
(대표적인 Iterable 클래스는 List
, ArrayList
, ArrayBlockingQueue
, HashSet
, LinkedList
, Stack
, TreeSet
등이 있습니다.)
val names = ArrayList<String>()
names.add("Jerry")
names.add("William")
names.add("Bob")
val source = Observable.fromIterable(names)
source.subscribe { println(it) }
//------------------------------------------
결과)
Jerry
William
Bob
fromCallable() 함수
자바5에서 추가된 동시성 API인 Callable을 호출하기 위한 함수입니다. 간단히 설명을 하면 아래와 같습니다.
Runnable
: 스레드 처리 이후에 리턴값 XCallable
: 스레드 처리 이후에 리턴값 O
val callable = {
sleep(1000)
"Hello World"
}
val source = Observable.fromCallable(callable)
source.subscribe { println(it) }
//------------------------------------------
결과)
Hello World
fromFuture() 함수
Future 인터페이스도 자바5에 추가된 동시성 API로 비동기 계산에 이용합니다. get()메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올때까지 블로킹 됩니다.
val future = Executors.newSingleThreadExecutor().submit(Callable {
sleep(1000)
"Hello World"
})
val source = Observable.fromFuture(future)
source.subscribe { println(it) }
//------------------------------------------
결과)
Hello World
Single 클래스
하나의 데이터만 가지고, 비어 있으면 기본값을 대신 발행합니다.
first()
를 호출하면 Observable 객체가 Single 객체로 변환됨 이렇게 되면 뒤에 데이터가 있어도 더 이상 발행하지 않고 onSuccess
를 호출합니다.
Observable.just("Hello World")
.single("Default Value")
.subscribe { result ->
println(result!!)
}
Observable.empty<String>()
.single("Default Value")
.subscribe { result ->
println(result!!)
}
//------------------------------------------
결과)
Hello World
Default Value
Hot Observable vs Cold Observable
Hot Observable
: 구독자의 존재 여부 없이 데이터를 발행 ex) 센서 이벤트, 입력 이벤트Cold Observable
: 구독자가 subscribe() 함수를 구독하기 전까지는 데이터를 발행하지 않음 ex) 웹 요청, 데이터베이스 관련 요청
** Note 구독자가 여러 명이다 라는 의미
내가 최종적으로 원하는 결과 데이터가 여러 종류일 때는 각각을 구독자로 생각하면 좋습니다
** 배압효과(Back Pressure)
Hot Observable때 주의해야할 점인데, 데이터를 발행하는 속도와 구독자가 처리하는 속도의
차이가 클 때 발생 합니다.
Subject 클래스
차가운 Observable
을 뜨거운 Observable
로 바꿔줍니다
AsyncSubject 클래스
onComplete() 이후에 가장 최신 데이터만 발행합니다. (이전 데이터는 무시합니다)
val subject = AsyncSubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()
//------------------------------------------
결과)
Subscriber #1 => 3
Subscriber #2 => 3
BehaviorSubject 클래스
데이터가 아직 없으면 기본값을 넘겨주고 그 이외에는 최신값을 가져오는 클래스
val subject = BehaviorSubject.createDefault("5")
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()
//------------------------------------------
결과)
Subscriber #1 => 5
Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #2 => 2
Subscriber #1 => 3
Subscriber #2 => 3
PublishSubject 클래스
오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달
val subject = PublishSubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()
//------------------------------------------
결과)
Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #1 => 3
Subscriber #2 => 3
ReplaySubject 클래스
구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장
val subject = ReplaySubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()
//------------------------------------------
결과)
Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #2 => 1
Subscriber #2 => 2
Subscriber #1 => 3
Subscriber #2 => 3
데이터 발행자와 수신자
데이터 발행자 | 데이터 수신자 |
---|---|
Observable | Subscriber |
Single | Observer |
Maybe | Consumer |
Subject | |
Completable |
ConnectableObservable 클래스
차가운 Observable을 뜨거운 Observable로 변환하며, 원 데이터 하나를 여러 구독자에게 동시에 전달할때 사용하며, 특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않습니다 새로 추가된 connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행)
val dt = arrayOf("1", "3", "5")
val balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map {it.toInt()}
.map { i -> dt[i] }
.take(dt.size.toLong())
val source = balls.publish()
source.subscribe { data -> println("subscriber #1 => $data") }
source.subscribe { data -> println("subscriber #2 => $data") }
source.connect()
sleep(250)
source.subscribe { data -> println("subscriber #3 => $data") }
sleep(100)
//------------------------------------------
결과)
subscriber #1 => 1
subscriber #2 => 1
subscriber #1 => 3
subscriber #2 => 3
subscriber #1 => 5
subscriber #2 => 5
subscriber #3 => 5
Tips
- RxJava에서 여러 개의 데이터를 발행하는 방법은 Observable밖에 없습니다.
- 리액티브 프로그래밍 : 함수형 프로그래밍을 활용한 비동기 프로그래밍
map() 함수
입력값을 어떤 함수
에 넣어서 원하는 값으로 변환하는 함수입니다
val balls = arrayOf("1", "2", "3", "4", "5")
Observable.fromArray(*balls)
.map {ball -> "$ball<>" }
.subscribe(System.out::println)
//------------------------------------------
결과)
1<>
2<>
3<>
4<>
5<>
flatMap() 함수
map()과 비슷하지만 리턴타입이 Observable로 반환되기 때문에 여러개의 결과를 반환 할 수 있습니다.
val balls = arrayOf("1", "2", "3", "4", "5")
Observable.fromArray(*balls)
.flatMap{ ball -> Observable.just("$ball<>", "<>$ball") }
.subscribe(System.out::println)
//------------------------------------------
결과)
1<>
<>1
2<>
<>2
3<>
<>3
4<>
<>4
5<>
<>5
RxJava로 구구단 만들어 보기
val input = 3
Observable.range(1, 9)
.map { by -> "$input * $by = ${input * by}"}
.subscribe(System.out::println)
//------------------------------------------
결과)
3 * 1 = 3
3 * 2 = 6
3 * 3 = 9
3 * 4 = 12
3 * 5 = 15
3 * 6 = 18
3 * 7 = 21
3 * 8 = 24
3 * 9 = 27
filter() 함수
filter() 함수는 Observable에서 원하는 데이터만 걸러내는 역할을 합니다.
val arr = arrayOf("1 CIRCLE", "2 DIAMOND", "3 TRIANGLE", "4 DIAMOND", "5 CIRCLE", "6 HEXAGON")
Observable.fromArray(*arr)
.filter { obj -> obj.endsWith("CIRCLE") }
.subscribe(System.out::println)
//------------------------------------------
결과)
1 CIRCLE
5 CIRCLE
filter()와 비슷하게 이용할 수 있는 함수들
first(default)
: Objservable의 첫 번째 값을 필터함, 만약 값이 없으면 기본값을 리턴last(default)
: Objservable의 마지막 값을 필터함, 만약 값이 없으면 기본값을 리턴take(N)
: 최초 N개 값을 가져옴takeLast(N)
: 마지막 N개 값을 가져옴skip(N)
: 최초 N개 값을 건너뜀skipLast(N)
: 마지막 N개 값을 건너 가져옴
reduce() 함수
발행한 데이터를 모두 사용하여 최종 결과 데이터를 합성할때 이용합니다.
val balls = arrayOf("1", "3", "5")
Observable.fromArray(*balls)
.reduce { ball1, ball2 -> "$ball2($ball1)" }
.subscribe(System.out::println)
//------------------------------------------
결과)
5(3(1))