리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.
이 부분에서는 스케쥴러에 대한 부분을 포스팅합니다.
이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java
로 작성하였지만 이번에는 Kotlin
으로 예제를 바꿔서 작성하였습니다.
마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.
스케쥴러
스케줄러는 스레드를 지정할 수 있게 해줍니다. 특히 강력한 기능은 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다는 것입니다.
- 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정이 가능합니다.
- subscribeOn() 함수와 observeOn() 함수를 모두 지정하면, 데이터 흐름이 발생하는 스레드와 결과를 발행하는 스레드를 분리할 수 있습니다.
- subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행됩니다.
- 스케쥴러를 별도로 지정하지 않으면 메인 스레드에서 동작을 합니다.
스케줄러 지정
RxJava는 작업을 할 스레드와 작업결과를 보여줄 스레드를 지정할 수 있습니다.
subscribeOn()
: 작업을 처리할 스레드를 지정observeOn()
: 작업결과를 보여줄 스레드를 지정
subscribeOn() 함수
작업을 처리할때의 스레드를 결정할 때 지정
val args = listOf("1", "3", "7")
Observable.fromIterable(args)
.doOnNext { data -> println("Original data : $data") }
.map { data -> "<<$data>>" }
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
Original data : 1
<<1>>
Original data : 3
<<3>>
Original data : 7
<<7>>
observeOn() 함수
처리 이후에 구독자에게 전달되는 스레드를 정할 수 있으며, 여러번 호출하여 매번 다른 스레드로 전달하게 할 수도 있습니다.
만약 observeOn() 함수를 지정하지 않으면 subscribeOn() 함수로 지정한 스레드에서 모든 로직을 실행하게 됩니다.
String[] objs = {"1-S", "2-T", "3-P"};
Observable<String> source = Observable.fromArray(objs)
.doOnNext(data -> Log.i("Original data = " + data))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(Shape::getShape);
source.subscribe(Log::i);
CommonUtils.sleep(500);
//------------------------------------------
결과)
RxNewThreadScheduler-1 | value = Original data = 1-S
RxNewThreadScheduler-1 | value = Original data = 2-T
RxNewThreadScheduler-1 | value = Original data = 3-P
RxNewThreadScheduler-2 | value = STAR
RxNewThreadScheduler-2 | value = TRIANGLE
RxNewThreadScheduler-2 | value = PENTAGON
스케쥴러의 종류
스케줄러 | RxJava 2.x |
---|---|
뉴 스레드 스케쥴러 | newThread() |
싱글 스레드 스케쥴러 | single() |
계산 스케줄러 | comjputation() |
IO 스케쥴러 | io() |
트램펄린 스케쥴러 | trampoline() |
메인 스레드 스케쥴러 | 지원 X |
뉴 스레드 스케쥴러
요청을 받을 때마다 새로운 스레드를 생성하여 작업을 합니다.
먼저 중간에 sleep()을 했을때의 예제 입니다.
val args = listOf("1", "3", "7")
Observable.fromIterable(args)
.doOnNext { data -> println("Original data : $data") }
.map { data -> "<<$data>>" }
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println)
sleep(500)
Observable.fromIterable(args)
.doOnNext { data -> println("Original data : $data") }
.map { data -> "++$data++" }
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
Original data : 1
<<1>>
Original data : 3
<<3>>
Original data : 7
<<7>>
Original data : 1
++1++
Original data : 3
++3++
Original data : 7
++7++
아래는 중간에 sleep()을 주석처리해보았습니다.
val args = listOf("1", "3", "7")
Observable.fromIterable(args)
.doOnNext { data -> println("Original data : $data") }
.map { data -> "<<$data>>" }
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println)
// sleep(500)
Observable.fromIterable(args)
.doOnNext { data -> println("Original data : $data") }
.map { data -> "++$data++" }
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
Original data : 1
Original data : 1
++1++
<<1>>
Original data : 3
Original data : 3
++3++
<<3>>
Original data : 7
++7++
Original data : 7
<<7>>
두 개의 Observable은 각각 별도의 스레드에서 작업을 하기 때문에, 출력 결과가 바뀌는 것을 확인할 수 있습니다.
계산 스케줄러
CPI에 대응하는 계산용 스케쥴러로, ‘계산’ 작업때 사용하며, 별도의 I/O가 없는 스케쥴러입니다.
내부적으로 스레드 풀을 생성하며 별도의 설정이 없으면 스레드의 개수는 프로세서의 개수와 동일합니다.
(** interval() 함수는 기본적으로 계산 스케쥴러를 사용합니다)
val args = listOf("1", "3", "5")
val source = Observable.fromIterable(args)
.zipWith<Long, String>(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction<String, Long, String> { a, b -> a })
source.map { item -> "<<$item>>" }
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println)
source.map { item -> "++$item++" }
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println)
sleep(1000)
//------------------------------------------
결과)
<<1>>
++1++
<<3>>
++3++
<<5>>
++5++
IO 스케줄러
네트워크나, 각종 I/O 작업을 위한 스케줄러 입니다. 필요할 때마다 스레드를 계속 생성합니다,
IO 작업은 비동기로 실행이 되며 결과를 얻기까지 대기 시간이 깁니다.
val home = "/Users/jungwoon"
val files = File(home).listFiles()
val source = Observable.fromArray(*files!!)
.filter { !it.isDirectory }
.map { it.absolutePath }
.subscribeOn(Schedulers.io())
source.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
/Users/jungwoon/.zcompdump-Jungwoon의 MacBook Pro-5.6.2
/Users/jungwoon/.DS_Store
/Users/jungwoon/.CFUserTextEncoding
/Users/jungwoon/.bashrc
/Users/jungwoon/.zshrc.pre-oh-my-zsh
/Users/jungwoon/.zshrc
/Users/jungwoon/.zprofile
/Users/jungwoon/.delfino.conf
/Users/jungwoon/.boto
/Users/jungwoon/.zsh_history
/Users/jungwoon/.mkshrc
/Users/jungwoon/.lesshst
/Users/jungwoon/.emulator_console_auth_token
/Users/jungwoon/.node_repl_history
/Users/jungwoon/.sh_history
/Users/jungwoon/.vimrc
/Users/jungwoon/.profile
/Users/jungwoon/.scala_history
/Users/jungwoon/.mysql_history
/Users/jungwoon/.bash_profile
/Users/jungwoon/.python_history
/Users/jungwoon/.gitconfig
/Users/jungwoon/.bash_history
/Users/jungwoon/.viminfo
/Users/jungwoon/.zlogin
/Users/jungwoon/cloud_sql_proxy
트램펄린 스케줄러
새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 Queue를 생성하는 스케쥴러입니다.
val args = listOf("1", "3", "5")
val source = Observable.fromIterable(args)
source.subscribeOn(Schedulers.trampoline())
.map { data -> "<<$data>>" }
.subscribe(System.out::println)
source.subscribeOn(Schedulers.trampoline())
.map { data -> "++$data++" }
.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
<<1>>
<<3>>
<<5>>
++1++
++3++
++5++
싱글 스레드 스케쥴러
RxJava 내부에서 단일 스레드를 생성하여, 구독작업을 처리하는데, 여러 번 구독 요청이 와도 공통으로 사용합니다.
val ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val numbers = Observable.range(100, 5)
val chars = Observable.range(0, 5)
.map{ ALPHABET[it % ALPHABET.length] }
numbers.subscribeOn(Schedulers.single())
.subscribe(System.out::println)
chars.subscribeOn(Schedulers.single())
.subscribe(System.out::println)
sleep(500)
//------------------------------------------
결과)
100
101
102
103
104
A
B
C
D
E