리액티브 프로그래밍이 핫하다고 해서 공부하면서 정리해봅니다. 참고 자료는 RxJava 프로그래밍를 참고해서 만들었습니다. 양이 너무 많아서 나눠서 프스팅 하려고 합니다.

이 부분에서는 연산자에 대한 부분을 포스팅합니다.

이전에 포스팅을 해놓고 다 까먹어서 다시 정리하는 내용입니다, 이전에는 Java로 작성하였지만 이번에는 Kotlin으로 예제를 바꿔서 작성하였습니다.

마블 다이어그램의 이미지 출처는 https://reactivex.io입니다.


생성 연산자

생성 연산자는 데이터의 흐름을 만듭니다.

  • interval() : 일정 시간 간격으로 데이터 흐름을 만듭니다.
  • timer() : interval() 함수와 유사하지만 일정시간이 지나고 한 번만 실행합니다.
  • range() : 주어진 값(n) 부터 m개의 Integer 객체를 발행합니다.
  • intervalRange() : interval()과 range()를 혼합해놓은 함수로, interval() 함수처럼 일정한 시간 간격으로 값을 출력하지만 range() 함수처럼 시작 숫자(n)로부터 m개만큼의 값만 생성하고 onComplete 이벤트가 발생하기 때문에 interval() 처럼 무한히 데이터 흐름을 발행하지 않습니다. 리턴 타입은 Long 타입입니다.
  • defer() : defer() 함수는 timer() 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있습니다.
  • repeat() : 데이터를 m만큼 반복해서 실행해 주는 함수입니다.

interval() 함수

주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 생성합니다.

interval(period: Long, unit: TimeUnit)

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map{ (it + 1) * 100 }
    .take(5) // 5개까지만 발행합니다.
    .subscribe(System.out::println)

// interval은 별도의 스레드에서 처리를 하기 때문에, 해당 스레드 작업이 완료될때까지 프로그램이 끝나면 안되기 때문에
// Thread.sleep()을 통해 기다립니다.
sleep(1000)


//------------------------------------------
            
결과)

100
200
300
400
500


timer() 함수

timer()는 interval()과 비슷하지만, 일정 시간이 지난 후, 한 개의 데이터를 발행하고 onComplete() 이벤트를 발생시킵니다.

println(SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()))

// 3초뒤에 데이터 발행
Observable.timer(3000L, TimeUnit.MILLISECONDS)
    .map{ SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()) }
    .subscribe(System.out::println)

// interval은 별도의 스레드에서 처리를 하기 때문에, 해당 스레드 작업이 완료될때까지 프로그램이 끝나면 안되기 때문에
// Thread.sleep()을 통해 기다립니다.
sleep(5000)

//------------------------------------------
            
결과)

2019/07/05 21:20:46
2019/07/05 21:20:49


range() 함수

주어진 값(n) 부터 m개의 Integer 객체를 발행합니다.


Observable.range(1, 10)
    .filter { number -> number % 2 == 0 }
    .subscribe(System.out::println)


//------------------------------------------
            
결과)

2
4
6
8
10


intervalRange() 함수

interval()과 range()를 합쳐놓은 함수로, interval()함수처럼 일정한 시간 간격으로 값을 출력하지만, range() 함수처럼 시작 숫자(n)로부터 m개만큼의 값을 생성하고, onComplete 이벤트를 발생합니다, 즉 interval() 함수처럼 무한히 데이터 흐름이 발생하지 않습니다.

Observable.intervalRange(
    1, // start
    5, // count
    100L, // initialDelay
    100L, // period
    TimeUnit.MILLISECONDS) // TimeUnit
        .map{ "${SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS").format(Date())} : $it" }
        .subscribe(System.out::println)
        
sleep(10000)

//------------------------------------------
            
결과)

2019/07/05 21:26:46.812 : 1
2019/07/05 21:26:46.887 : 2
2019/07/05 21:26:46.985 : 3
2019/07/05 21:26:47.087 : 4
2019/07/05 21:26:47.187 : 5


defer() 함수

timer() 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할때까지 미룰 수 있습니다. 이 때 새로운 Observable이 생성됩니다. Observable의 생성이 구독할 때까지 미뤄지기 대문에 최신 데이터를 얻을 수 있습니다.

즉 defer() 함수를 사용하면, subscribe() 함수를 호출할 때의 상황을 반영하여 데이터 흐름의 생성을 지연하는 효과를 보여줍니다.


repeat() 함수

단순히 반복 실행을 해주는 함수입니다. 서버가 살아있는지 확인할때 많이 사용합니다.

val balls = arrayOf("1", "3", "5")

Observable.fromArray(*balls)
    .repeat(3)
    .doOnComplete { println("onComplete()") }
    .subscribe(System.out::println)
        

//------------------------------------------
            
결과)

1
3
5
1
3
5
1
3
5
onComplete()
        

변환 연산자

변환 연산자는 데이터 흐름을 마음대로 변형할 수 있는 연산자들 입니다.

  • concatMap() : 먼저 들어온 데이터 순서대로 처리를 보장하는 함수
  • flatMap() : 달리 순서를 보장해주지 않습니다.
  • switchMap() : 순서를 보장하지만 중간에 발행이 되면 기존에 진행 중이던 작업을 바로 중단하고 그때 들어온 값으로 처리
  • groupBy() : 여러개의 단일 Observable을 그룹으로 묶어주는 함수
  • scan() : 실행할때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행
  • zip() : 2개 이상의 Observable을 결합할 때 사용합니다. 만약 한쪽의 Observable에서 처리가 안된다면 모두 처리될때까지 발행을 대기
  • combineLatest() : 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수
  • merge() : 아무것도 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행
  • concat() : 2개 이상의 Observable을 이어 붙여주는 함수이고, 첫번째 Observable에 onComplete 가 발생해야, 두 번째 Observable을 구독

concatMap() 함수

먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해줍니다.


val balls = arrayOf("1", "3", "5")

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> balls[idx] }
    .take(balls.size.toLong())
    .concatMap { ball ->
        Observable.interval(200L, TimeUnit.MILLISECONDS)
            .map { "$ball<>" }
            .take(2)
    }
    .subscribe(System.out::println)

sleep(2000)


//------------------------------------------
            
결과)

1<>
1<>
3<>
3<>
5<>
5<>

flatMap() 함수

concatMap()과 달리 순서를 보장해주지 않습니다.


val balls = arrayOf("1", "3", "5")

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> balls[idx] }
    .take(balls.size.toLong())
    .flatMap { ball ->
        Observable.interval(200L, TimeUnit.MILLISECONDS)
            .map { "$ball<>" }
            .take(2)
    }
    .subscribe(System.out::println)

sleep(2000)


//------------------------------------------
            
결과)

1<>
3<>
5<>
1<>
3<>
5<>

switchMap() 함수

순서를 보장하기 위함에 있어서는 concatMap()과 비슷하지만, switchMap() 함수는 순서를 보장하기 위해 중간에 발행이 되면 기존에 진행 중이던 작업을 바로 중단하고 그때 들어온 값으로 처리가 됩니다

또 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용합니다. 중간에 끊기더라도 마지막 데이터의 처리는 보장합니다


val balls = arrayOf("1", "3", "5")

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> balls[idx] }
    .take(balls.size.toLong())
    .switchMap { ball ->
        Observable.interval(200L, TimeUnit.MILLISECONDS)
            .map { "$ball<>" }
            .take(2)
    }
    .subscribe(System.out::println)

sleep(2000)


//------------------------------------------
            
결과)

5<>
5<>

groupBy() 함수

여러개의 단일 Observable을 여러 개로 이뤄진 Observable 그룹으로 만듭니다.


val objs =  listOf("6", "4", "2-T", "2", "6-T", "4-T")

Observable.fromIterable(objs)
    .groupBy {
        if (it == "") return@groupBy "NO-SHAPE"
        if (it.endsWith("-H")) return@groupBy "HEXAGON"
        if (it.endsWith("-O")) return@groupBy "OCTAGON"
        if (it.endsWith("-R")) return@groupBy "RECTANGLE"
        if (it.endsWith("-T")) return@groupBy "TRIANGLE"
        return@groupBy if (it.endsWith("<>")) "DIAMOND" else "BALL"
    }
    .subscribe {obj ->
        obj.subscribe {
            println("Group : ${obj.key} \t Value : $it")
        }
    }

sleep(2000)


//------------------------------------------
            
결과)

Group : BALL 	 Value : 6
Group : BALL 	 Value : 4
Group : TRIANGLE 	 Value : 2-T
Group : BALL 	 Value : 2
Group : TRIANGLE 	 Value : 6-T
Group : TRIANGLE 	 Value : 4-T

scan() 함수

reduce() 함수는 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터를 구독자에게 발행했다면, scan() 함수는 실행할때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행합니다.


val balls = arrayOf("1", "3", "5")

Observable.fromArray(*balls)
    .scan { ball1, ball2 -> "$ball2($ball1)" }
    .subscribe(System.out::println)

//------------------------------------------
            
결과)

1
3(1)
5(3(1))


결합 연산자

생성 연산자와 변환 연산자는 1개의 Observable(=데이터 흐름) 을 다뤘는데 여러개의 Observable을 다루는 결합 연산자 연산자를 알아보도록 하겠습니다. 결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공합니다.

  • zip() : 입력 Observable에서 데이터를 모두 새로 발행했을 때 그것을 합칩니다.
  • combineLatest() : 처음에 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하던 최신 값으로 갱신합니다.
  • merge() : 최신 데이터 여부와 상관없이 각 Observable에서 발행하는 데이터를 그대로 출력합니다.
  • concat() : 입력된 Observable을 Observable 단위로 이어 붙여줍니다.

zip() 함수

zip() 함수의 특징은 2개 이상의 Observable을 결합할 때 사용합니다. 만약 한쪽의 Observable에서 처리가 안된다면 모두 처리될때까지 발행을 기다립니다.

주의할 점) BiFunction<입력 타입, 입력 타입, 리턴 타입> 에 입력값하고 리턴값의 자료형을 구체적으로 명시해주어야 합니다.


val shapes = listOf("BALL", "PENTAGON", "STAR")
val coloredTriangles = listOf("2-T", "6-T", "4-T")

Observable.zip(
    Observable.fromIterable(shapes).map {
        when (it) {
            "HEXAGON" -> return@map "-H"
            "OCTAGON" -> return@map "-O"
            "RECTANGLE" -> return@map "-R"
            "TRIANGLE" -> return@map "-T"
            "DIAMOND" -> return@map "<>"
            "PENTAGON" -> return@map "-P"
            "STAR" -> return@map "-S"
            else -> return@map ""
        }
    },
    Observable.fromIterable(coloredTriangles).map {
        if (it.endsWith("<>")) {
            return@map it.replace("<>", "").trim()
        }

        val hyphen = it.indexOf("-")

        if (hyphen > 0) {
            return@map it.substring(0, hyphen)
        }

        return@map it
    },
    BiFunction<String, String, String> { suffix, color ->  return@BiFunction color + suffix}
).subscribe(System.out::println)

//------------------------------------------
            
결과)

2
6-P
4-S

zipWith() 함수

zipWith() 함수는 zip() 함수와 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈히 호출할 수 있는 장점이 있습니다.


Observable.zip(
    Observable.just(100, 200, 300),
    Observable.just(10, 20, 30),
    BiFunction<Int, Int, Int> {a, b -> a + b})
    .zipWith(Observable.just(1, 2, 3), BiFunction<Int, Int, Int> { ab, c -> ab + c })
    .subscribe(System.out::println)

//------------------------------------------
            
결과)

111
222
333


combineLatest() 함수

2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수입니다.


val data1 = listOf("6", "7", "4", "2")
val data2 = listOf("DIAMOND", "STAR", "PENTAGON")

Observable.combineLatest(
    Observable.fromIterable(data1)
        .zipWith(
            Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
            BiFunction<String, Long, String> { color, _ ->
                if (color.endsWith("<>")) {
                    return@BiFunction color.replace("<>", "").trim()
                }

                val hyphen = color.indexOf("-")

                if (hyphen > 0) {
                    return@BiFunction color.substring(0, hyphen)
                }

                return@BiFunction color
            }
        ),
    Observable.fromIterable(data2)
        .zipWith(
            Observable.interval(200L, TimeUnit.MILLISECONDS),
            BiFunction<String, Long, String> { shape, _ ->
                when (shape) {
                    "HEXAGON" -> return@BiFunction "-H"
                    "OCTAGON" -> return@BiFunction "-O"
                    "RECTANGLE" -> return@BiFunction "-R"
                    "TRIANGLE" -> return@BiFunction "-T"
                    "DIAMOND" -> return@BiFunction "<>"
                    "PENTAGON" -> return@BiFunction "-P"
                    "STAR" -> return@BiFunction "-S"
                    else -> return@BiFunction ""
                }
            }
        ),
    BiFunction<String, String, String> { color, suffix -> color + suffix }
).subscribe(System.out::println)

sleep(1000)

//------------------------------------------
            
결과)

6<>
7<>
7-S
4-S
4-P
2-P

merge() 함수

가장 단순한 결합 함수로, 아무것도 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행합니다.


val data1 = listOf("1", "3")
val data2 = listOf("2", "3", "6")

val source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> data1[idx] }
    .take(data1.size.toLong())

val source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> data2[idx] }
    .take(data2.size.toLong())

Observable.merge(source1, source2).subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

1
2
3
3
6     
        

concat() 함수

2개 이상의 Observable을 이어 붙여주는 함수이고, 첫번째 Observable에 onComplete 가 발생해야, 두 번째 Observable을 구독합니다.


val data1 = listOf("1", "3", "5")
val data2 = listOf("2", "3", "6")

val source1 = Observable.fromIterable(data1)
    .doOnComplete { println("onComplete()") }

val source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> data2[idx] }
    .take(data2.size.toLong())
    .doOnComplete { println("onComplete()") }

Observable.concat(source1, source2)
    .doOnComplete { println("onComplete()") }
    .subscribe(System.out::println)

sleep(1000)

//------------------------------------------
            
결과)

1
3
5
onComplete()
2
3
6
onComplete()
onComplete()

조건 연산자

Observable의 흐름을 제어하는 역할을 합니다.

  • amb() : 둘 중 어느 것이든 먼저 나오는 Observable을 채택합니다.
  • takeUntil(other) : other Observable에서 데이터가 발행하기 전까지만 현재 Observable을 채택합니다
  • skipUntil(other) : other Observable에서 데이터가 발행될때까지 현재 Observable에서 발행하는 값을 무시합니다.
  • all() : Observable에 입력되는 값이 모두 특정 조건에 맞을 때만 true를 발행합니다. 만약 조건이 맞지 않으면 false를 발행합니다.

amb() 함수

여러개의 Observable 중에서 가장 먼저 데이터를 발행하는 Observable을 선택합니다.

val data1 = listOf("1", "3", "5")
val data2 = listOf("2-R", "4-R")

val sources = listOf(
    Observable.fromIterable(data1)
        .doOnComplete { println("Observable #1 : onComplete()") },
    Observable.fromIterable(data2)
        .delay(100L, TimeUnit.MILLISECONDS)
        .doOnComplete { println("Observable #2 : onComplete()") })

Observable.amb<String>(sources)
    .doOnComplete { println("Result : onComplete()") }
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

1
3
5
Observable #1 : onComplete()
Result : onComplete()


takeUntil() 함수

인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시완료합니다.

즉, take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지 판단합니다.

val list = listOf("1", "2", "3", "4", "5", "6")

Observable.fromIterable(list)
    .zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction<String, Long, String> { data, _ -> data })
    .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

1
2
3
4


skipUntil() 함수

skipUntil 함수는 takeUntil() 함수의 반대로 현재 Observable의 발행을 무시하다가, 설정된 Observable이 호출되면 그때 발행하기 시작합니다.

val list = listOf("1", "2", "3", "4", "5", "6")

Observable.fromIterable(list)
    .zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction<String, Long, String> {data, _ -> data})
    .skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println)

sleep(1000)


//------------------------------------------
            
결과)

5
6


all() 함수

주어진 조건이 모두 만족할때만 발행을 합니다.


val list = listOf("1", "2", "3", "4")

Observable.fromIterable(list)
    .map {
        when {
            it.endsWith("-H") -> return@map "HEXAGON"
            it.endsWith("-H") -> return@map "OCTAGON"
            it.endsWith("-H") -> return@map "RECTANGLE"
            it.endsWith("-H") -> return@map "TRIANGLE"
            it.endsWith("<>") -> return@map "DIAMOND"
            it.endsWith("-P") -> return@map "PENTAGON"
            it.endsWith("-S") -> return@map "STAR"
            else -> return@map "BALL"
        }
    }
    .all {
        it == "BALL"
    }
    .subscribe { result ->
        println(result!!)
    }

sleep(1000)


//------------------------------------------
            
결과)

true


유틸리티 함수

  • delay() 함수 : 발행을 지연시켜주는 함수
  • timeInterval() 함수 : 이전 데이터의 발행과 현재 데이터의 발행시간의 차를 보여주는 함수

delay() 함수

delay() 함수는 시간을 받아서 데이터 발행을 지연시켜줍니다.


val data = listOf("1", "2", "3", "4")

Observable.fromIterable(data)
    .delay(100L, TimeUnit.MILLISECONDS)
    .subscribe(System.out::println)
sleep(1000)


//------------------------------------------
            
결과) 100 Milliseconds 만큼 지연되고 데이터를 발행합니다.

1
2
3
4

timeInterval() 함수

timeInterval() 함수는 이전 값을 발행한 이후 시간이 얼마나 흘렀는지를 알려줍니다.


val data = listOf("1", "3", "7")

Observable.fromIterable(data)
    .delay {
        sleep(Random().nextInt(100).toLong())
        return@delay Observable.just(it)
    }
    .timeInterval()
    .subscribe(System.out::println)

sleep(1000)

//------------------------------------------
            
결과)

Timed[time=66, unit=MILLISECONDS, value=1]
Timed[time=36, unit=MILLISECONDS, value=3]
Timed[time=94, unit=MILLISECONDS, value=7]