티스토리 뷰

728x90

 

동시성 통신

  1. 채널
fun main() {
    runBlocking {
        val streamSize = 5
        val channel = Channel<Int>(3)
        
        launch {
            for (n in 1..streamSize) {
                delay(Random.nextLong(100))
                println("Sending: $n")
                channel.send(n)
            }
        }
        
        launch {
            for (i in 1..streamSize) {
                delay(Random.nextLong(100))
                val n = channel.receive()
                println("Receiving: $n")
            }
        }
    }
}
  • Thread-safe하게 임의의 데이터 스트림을 코루틴 사이에 공유할 수 있게 해준다.
  • 채널의 용량을 지정하는 정수값을 인자로 받는 Channel() 함수를 통해 채널을 생성할 수 있다.
  • 데이터를 스트림에 넣을 땐 send(), 스트림에서 받을 땐 receive()를 호출한다.
  • 버퍼가 꽉 차면 다른 코루틴이 값을 수신할 때까지 send() 호출이 일시 중단된다.
  • 버퍼가 비어있으면 다른 코루틴이 값을 송신할 때까지 receive() 호출이 일시 중단된다.

 

Channel()의 인자
Channel.UNLIMITED ( = Int.MAX_VALUE )
  • 채널의 용량에 제한이 없기 때문에 send() 함수는 일시 중단되지 않는다.
  • 내부 버퍼는 필요에 따라 증가한다.
Channel.RENDEZVOUS ( = 0 )
  • 아무 내부 버퍼가 없는 랑데부 채널이 된다.
  • send() 호출은 다른 곳에서 receive()를 호출할 때까지 항상 일시 중단된다.
  • receive() 호출은 다른 곳에서 send()를 호출할 때까지 항상 일시 중단된다.
  • default
Channel.CONFLATED ( = -1 )
  • 송신된 값이 합쳐지는(conflated) 채널이 된다.
  • send()로 보낸 값을 최대 하나만 버퍼에 저장하기 때문에 다른 send() 요청이 오면 기존의 값을 덮어쓴다.
  • send() 함수는 일시 중단되지 않는다.
0 ~ Int.MAX_VALUE 사이의 값
  • 인자 만큼의 버퍼 크기를 갖는 채널 생성

 

 

fun main() {
    runBlocking {
        val streamSize = 5
        val channel = Channel<Int>(Channel.RENDEZVOUS)
        
        launch {
            for (n in 1..streamSize) {
                delay(Random.nextLong(100))
                println("Sending: $n")
                channel.send(n)
            }
        }
        
        launch {
            for (i in 1..streamSize) {
                delay(Random.nextLong(100))
                val n = channel.receive()
                println("Receiving: $n")
            }
        }
    }
}
실행 결과
Sending: 1
Receiving: 1
Sending: 4
Receiving: 4
Sending: 9
Receiving: 9
Sending: 16
Receiving: 16
Sending: 25
Receiving: 25
  • 랑데부 채널(버퍼 크기가 0)은 생산자와 소비자 코루틴이 교대로 Active 된다.

 

 

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)
        
        launch {
            for (n in 1..5) {
                delay(100)
                println("Sending: $n")
                channel.send(n)
            }
            channel.close()
        }
        
        launch {
            for (n in channel) { // close() 전 까지 무한 루프
                println("Receiving: $n")
                delay(200)
            }
        }
    }
}
  • conflated 채널은 모든 원소를 받을 필요 없고 생산자가 보낸 값을 일부만 받아도 되는 경우 사용한다.
  • close() 함수를 호출해 채널이 닫혀 데이터를 더 이상 보내지 않는다는 것을 알린다.
  • 채널이 닫힌 후 send()를 호출하면 ClosedSendChannelException 예외가 발생한다.
  • 채널이 닫힌 후 receive()를 호출하면 정상적으로 데이터를 수신하지만 그 후 ClosedReceiveChannelException 예외가 발생한다.

 

channel.consumeEach {
    println("Receiving: $it")
    delay(200)
}
 
  • 이터레이션을 사용하지 않고 consumeEach() 함수를 통해 채널의 데이터를 얻을 수 있다.

 

 

fun main() {
    runBlocking {
        val streamSize = 5
        val channel = Channel<Int>(2)
         
        launch { // 생산자 1개
            for (n in 1..streamSize) {
                println("Sending: $n")
                channel.send(n)
            }
            channel.close()
        }
         
        for (i in 1..3) { // 소비자 3개
            launch {
                for (n in channel) {
                    println("Receiving by consumer $i: $n")
                    delay(Random.nextLong(100))
                }
            }
        }
    }
}
  • 채널 통신에 참여하는 생산자와 소비자가 꼭 코루틴 하나씩일 필요는 없다.
  • 한 채널을 여러 코루틴이 동시에 읽을 수 있다. → fan out
  • 여러 생산자 코루틴이 한 채널에 데이터를 넣고, 하나의 소비자 코루틴이 읽을 수 있다. → fan in
  • 여러 생산자, 여러 소비자가 여러 채널을 공유할 수 있다.

 

 

 

       2. 프로듀서

  • produce() 함수를 통해 데이터 스트림을 생성할 수 있다.
  • send() 함수를 제공하는 ProducerScope 영역을 만들어준다. 
  • ReceiveChannel이 반환된다.

 

fun main() {
    runBlocking {
        val channel = produce {
            for (n in 1..5) {
                println("Sending: $n")
                send(n)
            }
        }
         
        launch {
            channel.consumeEach { println("Receiving: $it") }
        }
    }
}
  • 명시적으로 close할 필요 없이 코루틴이 종료되면 produce() 빌더가 채널을 자동으로 닫아준다.
  • async()/await() 방식과 동일하게, 예외를 저장하고 receive()가 호출될 때 다시 발생한다.

 

 

 

       3. 티커

  • Unit 값을 계속 발생시키고, 한 원소와 다음 원소의 발생 시점을 일정 시간동안 떨어져 있는 스트림을 생성한다.
  • 버퍼의 크기가 0인 랑데부 채널이다.
  • ticker() 함수를 통해 생성한다.
    • delayMillis: 원소의 발생 시간 간격
    • initialDelayMilis: 티커 생성 시점과 가장 첫 원소 발생 시점 사이의 간격 (default = delayMillis)
    • context: 코루틴 context
    • mode
      • TickerMode.FIXED_PERIOD: 시간 간격을 delayMillis에 맞추기 위해 실제 지연 시간을 조정한다. (default)
      • TickerMode.FIXED_DELAY: 실제 흘러간 시간과 관계 없이 delayMillis 만큼 지연 후 다음 원소를 발생시킨다.
 
fun main() = runBlocking {
    val tickerChannel = ticker(100)
     
    println(withTimeoutOrNull(50) { tickerChannel.receive() })
    println(withTimeoutOrNull(60) { tickerChannel.receive() })
    delay(150)
    println(withTimeoutOrNull(1) { tickerChannel.receive() })
    println(withTimeoutOrNull(60) { tickerChannel.receive() })

    tickerChannel.cancel()
}

 

실행 결과
null
kotlin.Unit
kotlin.Unit
kotlin.Unit
  • 150ms 동안 일시 중단되어 티커는 지연 시간을 조정한다. 

 

 

실행 결과
null
kotlin.Unit
kotlin.Unit
null
  • 티커 모드를 FIXED_RATE로 변경하면 결과가 위와 같이 바뀐다.
  • 150ms 동안 일시 중단되었지만, 실제 흐른 시간은 고려하지 않고 ticker의 간격인 100ms를 다시 기다린다.

 

 

 

       4. 액터

sealed class Action

object Inc : Action()
class Get(val response: CompletableDeferred<Int>) : Action()

fun main() : Unit = runBlocking {
    val actorCounter = actor<Action> {
        var counter = 0
         
        for (msg in channel) {
            when(msg) {
                is Inc -> counter++
                is Get -> msg.response.complete(counter)
            }
        }
    }
     
    val job = launch {
        repeat(5000) {
            actorCounter.send(Inc)
        }
    }

    job.join()

    val response = CompletableDeferred<Int>()
    actorCounter.send(Get(response))
    println(response.await()) // 5000
    actorCounter.close()
}
  • 액터를 통해 가변 상태를 Thread-safe하게 공유할 수 있다. (단일 스레드로 관리)
  • actor() 함수를 통해 생성할 수 있다.
  • 기본적으로 랑데부 채널을 사용한다.
  • SendChannel이 반환된다.

 

  • 액터에게 메시지를 보내려면 send()를 사용한다.
  • 액테에게 메시지를 보낸 대상으로 요청 결과를 돌려줄 때 complete()를 사용한다.

 

 

 

       5. Flow

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    foo().collect { value -> println(value) }
}
  • 비동기적으로 처리될 값들의 스트림을 나타내기 위해 플로우를 사용한다.
  • flow {} 빌더를 이용해 Flow 타입을 생성하고, 블록 안에는 suspend 함수들이 들어갈 수 있다.
  • suspend 키워드를 사용하지 않아도 된다.
  • emit()를 이용해 값을 방출한다.
  • 방출된 값은 collect()를 이용해 수집된다.

 

 

fun main() = runBlocking {
    val foo = flow {
        println("Flow started")
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
    println("collect started")
    foo.collect { }
    foo.collect { }
}
실행 결과
collect started
Flow started
Flow started
  • 플로우 빌더의 본문은 collect가 호출되기 전까지는 실행되지 않는다. (cold stream)
  • 플로우 타입의 변수 foo를 생성했을 때 Flow started는 출력되지 않고, collect() 호출 시에 출력된다.
  • 어떤 것도 일시 중단되지 않고 즉시 foo 변수에 대입된다.

 

 

(1..3).asFlow().collect { vale -> println(value) }
  • 컬렉션, 시퀀스들은 asFlow() 확장 함수를 통해 플로우로 변환할 수 있다.

 

 

  • 플로우 중간 연산자
fun main() = runBlocking {
    (1..3).asFlow()
        .map { request ->
            delay(1000)
            "response $request"
        }
        .collect { response -> println(response) }
}
실행 결과
response 1
response 2
response 3
    • map, filter, transform, take 등의 함수를 사용할 수 있다. (의미는 컬렉션 함수들과 동일)
    • 블록 안에서 suspend 함수를 호출할 수 있다.
    • 플로우와 동일하게 cold type으로 동작하기 때문에, 새로운 Flow를 즉시 반환한다.

 

 

  • 플로우 종단 연산자
fun main() = runBlocking {
    val sum = (1..10).asFlow()
        .map { it + it }
        .reduce { a, b -> a + b }
    println(sum) // 110
}
    • toList(), toSet() 등을 통해 컬렉션으로 변환할 수 있다.
    • reduce(), fold()를 통해 하나의 값으로 변환할 수 있다.

 

 

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0             
        }             
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }   
}
실행 결과
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
  • 플로우는 순차적이기 때문에 하나의 값 각각 마지막 연산까지 수행된다.

 

 

  • 플로우 context
fun foo() = flow {
    println("[${Thread.currentThread().name}] Started flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking {
    foo().collect { value -> println("[${Thread.currentThread().name}] Collected $value")}
}
실행 결과
[main @coroutine#1] Started flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
    • flow 빌더의 본문은 항상 collect()를 호출한 코루틴의 context에서 수행된다.

 

 

 
fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            println("[${Thread.currentThread().name}] Emitting $i")
            emit(i)
        }
    }.flowOn(Dispatchers.Default)
     
    foo.collect { value -> println("[${Thread.currentThread().name}] Collected $value")}
}
실행 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
    • flow 빌더 내부에서 withContext()를 통해서 context를 변경시킬 수 없다.
    • Flow 타입 전체에 flowOn() 함수를 호출해서 블록 안을 실행할 context를 지정한다.

 

 

  • buffer
fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
     
    val time = measureTimeMillis {
        foo.buffer()
            .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}
    • 방출하는 시간과 수집하는 시간이 오래 걸릴 경우, buffer() 함수를 통해서 동시에 방출/수집되도록 할 수 있다.
    • 위 코드에서 첫 번째 값만 100ms를 기다리고, 각각 300ms씩 기다리기 때문에 총 1000ms의 시간이 걸린다.
    • buffer()이 없으면, 세 숫자 각각 400ms씩 총 1200ms의 시간이 걸린다.

 

 

  • conflate
fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
     
    foo.conflate()
        .collect { value ->
            delay(300)
            println(value)
        }
}
실행 결과
1
3
    • conflate()를 통해 가장 최신의 값만 수집할 수 있다.
    • 1을 수집하는 동안 2, 3까지 방출되므로 2는 수집되지 않고 1, 3만 수집된다.

 

 

  • collectLatest
fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
     
    foo.collectLatest() { value ->
        println("Collected $value")
        delay(300)
        println("Done $value")
    }
}
실행 결과
Collected 1
Collected 2
Collected 3
Done 3
    • collectLatest()를 통해 새로운 값이 방출될 때마다 수집을 취소하고 다시 다음 값의 수집을 시작할 수 있다.

 

 

  • zip
fun main() = runBlocking {
    val nums = (1..3).asFlow()
    val strs = flowOf("one", "two", "three")
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}
실행 결과
1 -> one
2 -> two
3 -> three
    • zip()을 통해 두개의 플로우의 값을 하나로 합칠 수 있다.
    • 플로우 값의 개수가 다른 경우, 더 작은 플로우의 개수만큼 생성된다.

 

 

  • combine
 
fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) }
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

 

실행 결과
1 -> one
2 -> one
2 -> two
3 -> two
3 -> three
    • zip()과 동일하게 두개의 플로우 값을 하나로 합친다.
    • 하나의 플로우에서 방출이 일어날 때마다 수집된다.

 

 

  • 예외
fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            emit(i)
        }
    }

    try {
        foo.collect { value ->
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
실행 결과
1
2
Caught java.lang.IllegalStateException: Collected 2
    • try/catch 블록을 사용해 예외를 처리할 수 있다.
    • flow 빌더의 본문이나 플로우의 중간/종단 연산자에서 발생하는 모든 예외를 처리한다.

 

 

fun main() = runBlocking {
    val foo = flow {
        for (i in 1..3) {
            emit(i)
        }
    }
    .map { value ->
        check(value <=1) { "Crashed on $value"}
        "string $value"
    }

    foo.catch { e -> emit("Caught $e") }
        .collect { value -> println(value) }
}
실행 결과
string 1
Caught java.lang.IllegalStateException: Crashed on 2
    • catch()를 통해 예외를 처리할 수 있다.
    • 업스트림에서 발생하는 예외만 처리하고, 다운스트림에서 발생하는 예외는 처리하지 않는다.
      • 업스트림: catch() 함수 위의 모든 연산
      • 다운스트림: catch() 함수 아래의 모든 연산

 

 

  • 종료
fun main() = runBlocking {
    (1..3).asFlow()
        .onCompletion{ println("Done") }
        .collect { value -> println(value) }
}
    • onCompletion() 함수를 통해 종료 시 실행할 동작을 정의할 수 있다.
    • try/catch의 finally 블록을 정의하는 것과 동일하다.

 

728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함