RxJava2 정리 #3 - 스케쥴러

Posted by Jungwoon Blog on July 5, 2019

RxJava2 정리 #3 - 스케쥴러

리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.

이 부분에서는 스케쥴러에 대한 부분을 포스팅합니다.

이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java로 작성하였지만 이번에는 Kotlin으로 예제를 바꿔서 작성하였습니다.

마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.


스케쥴러

스케줄러는 스레드를 지정할 수 있게 해줍니다. 특히 강력한 기능은 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다는 것입니다.

  • 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정이 가능합니다.
  • subscribeOn() 함수와 observeOn() 함수를 모두 지정하면, 데이터 흐름이 발생하는 스레드와 결과를 발행하는 스레드를 분리할 수 있습니다.
  • subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행됩니다.
  • 스케쥴러를 별도로 지정하지 않으면 메인 스레드에서 동작을 합니다.

스케줄러 지정

RxJava는 작업을 할 스레드와 작업결과를 보여줄 스레드를 지정할 수 있습니다.

  • subscribeOn() : 작업을 처리할 스레드를 지정
  • observeOn() : 작업결과를 보여줄 스레드를 지정

subscribeOn() 함수

작업을 처리할때의 스레드를 결정할 때 지정


val args = listOf("1", "3", "7")

Observable.fromIterable(args)
    .doOnNext { data -> println("Original data : $data") }
    .map { data -> "<<$data>>" }
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println)
    
sleep(500)


//------------------------------------------
            
결과)

Original data : 1
<<1>>
Original data : 3
<<3>>
Original data : 7
<<7>>


observeOn() 함수

처리 이후에 구독자에게 전달되는 스레드를 정할 수 있으며, 여러번 호출하여 매번 다른 스레드로 전달하게 할 수도 있습니다.

만약 observeOn() 함수를 지정하지 않으면 subscribeOn() 함수로 지정한 스레드에서 모든 로직을 실행하게 됩니다.

String[] objs = {"1-S", "2-T", "3-P"};

Observable<String> source = Observable.fromArray(objs)
        .doOnNext(data -> Log.i("Original data = " + data))
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .map(Shape::getShape);

source.subscribe(Log::i);
CommonUtils.sleep(500);


//------------------------------------------
            
결과)

RxNewThreadScheduler-1 | value = Original data = 1-S
RxNewThreadScheduler-1 | value = Original data = 2-T
RxNewThreadScheduler-1 | value = Original data = 3-P
RxNewThreadScheduler-2 | value = STAR
RxNewThreadScheduler-2 | value = TRIANGLE
RxNewThreadScheduler-2 | value = PENTAGON


스케쥴러의 종류

스케줄러 RxJava 2.x
뉴 스레드 스케쥴러 newThread()
싱글 스레드 스케쥴러 single()
계산 스케줄러 comjputation()
IO 스케쥴러 io()
트램펄린 스케쥴러 trampoline()
메인 스레드 스케쥴러 지원 X

뉴 스레드 스케쥴러

요청을 받을 때마다 새로운 스레드를 생성하여 작업을 합니다.

먼저 중간에 sleep()을 했을때의 예제 입니다.

val args = listOf("1", "3", "7")

Observable.fromIterable(args)
    .doOnNext { data -> println("Original data : $data") }
    .map { data -> "<<$data>>" }
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println)

sleep(500)

Observable.fromIterable(args)
    .doOnNext { data -> println("Original data : $data") }
    .map { data -> "++$data++" }
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println)

sleep(500)


//------------------------------------------
            
결과)

Original data : 1
<<1>>
Original data : 3
<<3>>
Original data : 7
<<7>>
Original data : 1
++1++
Original data : 3
++3++
Original data : 7
++7++

아래는 중간에 sleep()을 주석처리해보았습니다.

val args = listOf("1", "3", "7")

Observable.fromIterable(args)
    .doOnNext { data -> println("Original data : $data") }
    .map { data -> "<<$data>>" }
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println)

    // sleep(500)

Observable.fromIterable(args)
    .doOnNext { data -> println("Original data : $data") }
    .map { data -> "++$data++" }
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println)

sleep(500)


//------------------------------------------
            
결과)

Original data : 1
Original data : 1
++1++
<<1>>
Original data : 3
Original data : 3
++3++
<<3>>
Original data : 7
++7++
Original data : 7
<<7>>

두 개의 Observable은 각각 별도의 스레드에서 작업을 하기 때문에, 출력 결과가 바뀌는 것을 확인할 수 있습니다.


계산 스케줄러

CPI에 대응하는 계산용 스케쥴러로, ‘계산’ 작업때 사용하며, 별도의 I/O가 없는 스케쥴러입니다.

내부적으로 스레드 풀을 생성하며 별도의 설정이 없으면 스레드의 개수는 프로세서의 개수와 동일합니다.

(** interval() 함수는 기본적으로 계산 스케쥴러를 사용합니다)

val args = listOf("1", "3", "5")

val source = Observable.fromIterable(args)
    .zipWith<Long, String>(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction<String, Long, String> { a, b -> a })

source.map { item -> "<<$item>>" }
    .subscribeOn(Schedulers.computation())
    .subscribe(System.out::println)

source.map { item -> "++$item++" }
    .subscribeOn(Schedulers.computation())
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

<<1>>
++1++
<<3>>
++3++
<<5>>
++5++


IO 스케줄러

네트워크나, 각종 I/O 작업을 위한 스케줄러 입니다. 필요할 때마다 스레드를 계속 생성합니다,

IO 작업은 비동기로 실행이 되며 결과를 얻기까지 대기 시간이 깁니다.


val home = "/Users/jungwoon"
val files = File(home).listFiles()

val source = Observable.fromArray(*files!!)
    .filter { !it.isDirectory }
    .map { it.absolutePath }
    .subscribeOn(Schedulers.io())

source.subscribe(System.out::println)
sleep(500)


//------------------------------------------
            
결과)

/Users/jungwoon/.zcompdump-Jungwoon MacBook Pro-5.6.2
/Users/jungwoon/.DS_Store
/Users/jungwoon/.CFUserTextEncoding
/Users/jungwoon/.bashrc
/Users/jungwoon/.zshrc.pre-oh-my-zsh
/Users/jungwoon/.zshrc
/Users/jungwoon/.zprofile
/Users/jungwoon/.delfino.conf
/Users/jungwoon/.boto
/Users/jungwoon/.zsh_history
/Users/jungwoon/.mkshrc
/Users/jungwoon/.lesshst
/Users/jungwoon/.emulator_console_auth_token
/Users/jungwoon/.node_repl_history
/Users/jungwoon/.sh_history
/Users/jungwoon/.vimrc
/Users/jungwoon/.profile
/Users/jungwoon/.scala_history
/Users/jungwoon/.mysql_history
/Users/jungwoon/.bash_profile
/Users/jungwoon/.python_history
/Users/jungwoon/.gitconfig
/Users/jungwoon/.bash_history
/Users/jungwoon/.viminfo
/Users/jungwoon/.zlogin
/Users/jungwoon/cloud_sql_proxy


트램펄린 스케줄러

새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 Queue를 생성하는 스케쥴러입니다.


val args = listOf("1", "3", "5")
val source = Observable.fromIterable(args)

source.subscribeOn(Schedulers.trampoline())
    .map { data -> "<<$data>>" }
    .subscribe(System.out::println)

source.subscribeOn(Schedulers.trampoline())
    .map { data -> "++$data++" }
    .subscribe(System.out::println)

sleep(500)


//------------------------------------------
            
결과)

<<1>>
<<3>>
<<5>>
++1++
++3++
++5++


싱글 스레드 스케쥴러

RxJava 내부에서 단일 스레드를 생성하여, 구독작업을 처리하는데, 여러 번 구독 요청이 와도 공통으로 사용합니다.

val ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

val numbers = Observable.range(100, 5)
val chars = Observable.range(0, 5)
    .map{ ALPHABET[it % ALPHABET.length] }

numbers.subscribeOn(Schedulers.single())
    .subscribe(System.out::println)

chars.subscribeOn(Schedulers.single())
    .subscribe(System.out::println)

sleep(500)

//------------------------------------------
            
결과)

100
101
102
103
104
A
B
C
D
E