RxJava2 정리 #1 - Observable과 기본 연산자

Posted by Jungwoon Blog on July 5, 2019

RxJava2 정리 #1 - Observable과 기본 연산자

리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 포스팅 하려고 합니다.

이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java로 작성하였지만 이번에는 Kotlin으로 예제를 바꿔서 작성하였습니다.

마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.

Gradle에 libarary 추가하기

다음과 같이 implementation 'io.reactivex.rxjava2:rxjava:2.2.6' 를 dependencies에 넣어줍니다.

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
    implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
}

부수효과 (Side Effect)

함수에서 출력(리턴)이 아닌, 외부의 상태에 영향을 미치는 부수적인 효과


Observable

Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 합니다. RxJava는 여러 Observable로 데이터 흐름을 만들어 원하는 데이터를 만들어내는게 핵심입니다. RxJava2에서는 상황에 맞게 Observable, Maybe, Flowable로 구분해 사용을 합니다.

  • Observable : Observable은 옵서버 패턴을 구현하여 객체의 상태 변화를 관찰하여 상태 변화가 있을때마다 상태 변화에 대해 옵서버에게 알려줍니다.
  • Maybe : reduce() 함수나 firstElement() 함수와 같이 데이터가 발행할 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미합니다.
  • Flowable : 데이터가 발생하는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back Pressure) 이슈에 대응하는 기능을 제공합니다.

Observable은 아래 세 가지의 알림을 구독자에게 전달합니다.

  • onNext : 데이터의 발행을 알립니다.
  • onComplete : 모든 데이터가 완료되었음을 알립니다. (onComplete이후에는 onNext가 발생해선 안됩니다)
  • onError : Observable에서 에러가 발생했을 알립니다, 이 경우에는 더 이상 onNextonComplete을 발생하지 않고 실행을 종료합니다.

just() 함수

인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성합니다.

(단, 타입은 모두 같아야 하고 여러개의 최대 넣을 수 있는 데이터의 개수는 10개입니다.)


Observable.just(1, 2, 3, 4, 5)
            .subscribe(System.out::println)

//------------------------------------------
            
결과)

1
2
3
4
5


subscribe()

subscribe는 실제 데이터를 발행하게끔 하는 함수로 Observable은 just()같은 함수로 데이터 흐름을 정의만 하고, 실제 데이터가 발행이되는건, subscribe가 호출된 이후입니다.

인자의 개수별로 subscribe는 달라집니다.

  • 인자 0개 : onNextonComplete 이벤트를 무시하고 onError 이벤트가 발생했을 때만 onErrorNotImplementedException을 던집니다.
  • 인자 1개 : onNext를 처리합니다.
  • 인자 2개 : onNextonError 이벤트를 처리합니다.
  • 인자 3개 : onNextonError, onComplete 이벤트를 처리합니다.

dispose()

dispose()는 Observable에게 더 이상 데이터를 발행하지 말라고 구독을 해지하는 함수입니다. 일반적으로는 onComplete 이벤트가 정상적으로 발생하면 구독자가 별도의 dispose()를 호출하지 않아도 됩니다.


create() 함수

just() 함수는 데이터를 넣으면 자동으로 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 이벤트를 직접 호출해야 합니다.

RxJava의 javadoc에 다르면 create()는 RxJava에 익숙한 사용자만 활용하도록 권고합니다. 이유는 개발자가 하나하나 수동으로 잡아줘야 하는 옵션들이 많아지기 때문입니다.

Observable.create { emitter: ObservableEmitter<Int> ->
    emitter.onNext(100)
    emitter.onNext(200)
    emitter.onNext(300)
    emitter.onComplete()
}.subscribe { println(it) }

//------------------------------------------
            
결과)

100
200
300

fromArray() 함수

Array안의 데이터를 하나씩 발행하는 함수


val arr = arrayOf(100, 200, 300)
val source = Observable.fromArray(*arr)
source.subscribe { println(it) }

//------------------------------------------
            
결과)
100
200
300

** arr이 아닌 *arr을 사용한 이유는 정수 배열은 명시적 래퍼 타임인 Integer이 아닌 int로 코딩을 하기 때문에 값을 호출하기 위해 *를 붙입니다.


fromIterable() 함수

Iterable 인터페이스의 값을 가져오는 함수

(대표적인 Iterable 클래스는 List, ArrayList, ArrayBlockingQueue, HashSet, LinkedList, Stack, TreeSet 등이 있습니다.)


val names = ArrayList<String>()
names.add("Jerry")
names.add("William")
names.add("Bob")

val source = Observable.fromIterable(names)
source.subscribe { println(it) }

//------------------------------------------
            
결과)

Jerry
William
Bob

fromCallable() 함수

자바5에서 추가된 동시성 API인 Callable을 호출하기 위한 함수입니다. 간단히 설명을 하면 아래와 같습니다.

  • Runnable : 스레드 처리 이후에 리턴값 X
  • Callable : 스레드 처리 이후에 리턴값 O

val callable = {
    sleep(1000)
    "Hello World"
}

val source = Observable.fromCallable(callable)
source.subscribe { println(it) }

//------------------------------------------
            
결과)

Hello World


fromFuture() 함수

Future 인터페이스도 자바5에 추가된 동시성 API로 비동기 계산에 이용합니다. get()메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올때까지 블로킹 됩니다.


val future = Executors.newSingleThreadExecutor().submit(Callable {
    sleep(1000)
    "Hello World"
})

val source = Observable.fromFuture(future)
source.subscribe { println(it) }

//------------------------------------------
            
결과)

Hello World


Single 클래스

하나의 데이터만 가지고, 비어 있으면 기본값을 대신 발행합니다.

first()를 호출하면 Observable 객체가 Single 객체로 변환됨 이렇게 되면 뒤에 데이터가 있어도 더 이상 발행하지 않고 onSuccess 를 호출합니다.


Observable.just("Hello World")
    .single("Default Value")
    .subscribe { result ->
        println(result!!)
    }


Observable.empty<String>()
    .single("Default Value")
    .subscribe { result ->
        println(result!!)
    }        
        
//------------------------------------------
            
결과)

Hello World
Default Value

Hot Observable vs Cold Observable

  • Hot Observable : 구독자의 존재 여부 없이 데이터를 발행 ex) 센서 이벤트, 입력 이벤트
  • Cold Observable : 구독자가 subscribe() 함수를 구독하기 전까지는 데이터를 발행하지 않음 ex) 웹 요청, 데이터베이스 관련 요청
** Note 구독자가 여러 명이다 라는 의미

내가 최종적으로 원하는 결과 데이터가 여러 종류일 때는 각각을 구독자로 생각하면 좋습니다
** 배압효과(Back Pressure)
Hot Observable때 주의해야할 점인데, 데이터를 발행하는 속도와 구독자가 처리하는 속도의
차이가 클 때 발생 합니다.

Subject 클래스

차가운 Observable뜨거운 Observable로 바꿔줍니다


AsyncSubject 클래스

onComplete() 이후에 가장 최신 데이터만 발행합니다. (이전 데이터는 무시합니다)


val subject = AsyncSubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()

//------------------------------------------
            
결과)

Subscriber #1 => 3
Subscriber #2 => 3


BehaviorSubject 클래스

데이터가 아직 없으면 기본값을 넘겨주고 그 이외에는 최신값을 가져오는 클래스

val subject = BehaviorSubject.createDefault("5")
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()

//------------------------------------------
            
결과)

Subscriber #1 => 5
Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #2 => 2
Subscriber #1 => 3
Subscriber #2 => 3


PublishSubject 클래스

오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달

val subject = PublishSubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()

//------------------------------------------
            
결과)

Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #1 => 3
Subscriber #2 => 3


ReplaySubject 클래스

구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장

val subject = ReplaySubject.create<String>()
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")
subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()

//------------------------------------------
            
결과)

Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #2 => 1
Subscriber #2 => 2
Subscriber #1 => 3
Subscriber #2 => 3


데이터 발행자와 수신자

데이터 발행자 데이터 수신자
Observable Subscriber
Single Observer
Maybe Consumer
Subject  
Completable  

ConnectableObservable 클래스

차가운 Observable을 뜨거운 Observable로 변환하며, 원 데이터 하나를 여러 구독자에게 동시에 전달할때 사용하며, 특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않습니다 새로 추가된 connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행)


val dt = arrayOf("1", "3", "5")

val balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map {it.toInt()}
    .map { i -> dt[i] }
    .take(dt.size.toLong())

val source = balls.publish()
source.subscribe { data -> println("subscriber #1 => $data") }
source.subscribe { data -> println("subscriber #2 => $data") }
source.connect()

sleep(250)
source.subscribe { data -> println("subscriber #3 => $data") }
sleep(100)

//------------------------------------------
            
결과)

subscriber #1 => 1
subscriber #2 => 1
subscriber #1 => 3
subscriber #2 => 3
subscriber #1 => 5
subscriber #2 => 5
subscriber #3 => 5


Tips

  • RxJava에서 여러 개의 데이터를 발행하는 방법은 Observable밖에 없습니다.
  • 리액티브 프로그래밍 : 함수형 프로그래밍을 활용한 비동기 프로그래밍

map() 함수

입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수입니다

val balls = arrayOf("1", "2", "3", "4", "5")

Observable.fromArray(*balls)
    .map {ball -> "$ball<>" }
    .subscribe(System.out::println)

//------------------------------------------
            
결과)

1<>
2<>
3<>
4<>
5<>


flatMap() 함수

map()과 비슷하지만 리턴타입이 Observable로 반환되기 때문에 여러개의 결과를 반환 할 수 있습니다.

val balls = arrayOf("1", "2", "3", "4", "5")

Observable.fromArray(*balls)
    .flatMap{ ball -> Observable.just("$ball<>", "<>$ball") }
    .subscribe(System.out::println)


//------------------------------------------
            
결과)

1<>
<>1
2<>
<>2
3<>
<>3
4<>
<>4
5<>
<>5


RxJava로 구구단 만들어 보기

val input = 3

Observable.range(1, 9)
    .map { by -> "$input * $by = ${input * by}"}
    .subscribe(System.out::println)

//------------------------------------------
            
결과)

3 * 1 = 3
3 * 2 = 6
3 * 3 = 9
3 * 4 = 12
3 * 5 = 15
3 * 6 = 18
3 * 7 = 21
3 * 8 = 24
3 * 9 = 27


filter() 함수

filter() 함수는 Observable에서 원하는 데이터만 걸러내는 역할을 합니다.


val arr = arrayOf("1 CIRCLE", "2 DIAMOND", "3 TRIANGLE", "4 DIAMOND", "5 CIRCLE", "6 HEXAGON")

Observable.fromArray(*arr)
    .filter { obj -> obj.endsWith("CIRCLE") }
    .subscribe(System.out::println)
        
//------------------------------------------
            
결과)

1 CIRCLE
5 CIRCLE


filter()와 비슷하게 이용할 수 있는 함수들

  • first(default) : Objservable의 첫 번째 값을 필터함, 만약 값이 없으면 기본값을 리턴
  • last(default) : Objservable의 마지막 값을 필터함, 만약 값이 없으면 기본값을 리턴
  • take(N) : 최초 N개 값을 가져옴
  • takeLast(N) : 마지막 N개 값을 가져옴
  • skip(N) : 최초 N개 값을 건너뜀
  • skipLast(N) : 마지막 N개 값을 건너 가져옴

reduce() 함수

발행한 데이터를 모두 사용하여 최종 결과 데이터를 합성할때 이용합니다.


val balls = arrayOf("1", "3", "5")

Observable.fromArray(*balls)
    .reduce { ball1, ball2 -> "$ball2($ball1)" }
    .subscribe(System.out::println)


//------------------------------------------
            
결과)

5(3(1))