티스토리 뷰

728x90

 

 

 

채널은 코루틴끼리의 통신하기 위한 기본적인 방법이다.

채널은 공공 도서관으로 비유할 수 있다. 하나의 책을 찾으려면, 책을 빌렸던 사람이 다시 반납해야 한다.

이는 채널이 작동하는 방식과 비슷하다.

채널은 송신자와 수신자의 수에 제한이 없고, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있다.

 

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
    // ...
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
    fun cancel(cause: CancellationException? = null)
    // ...
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
  • SendChannel: 원소를 보낼 때, 채널을 닫을 때 사용된다.
  • ReceiveChannel: 원소를 받을 때 사용된다.
  • Channel: SendChannel과 ReceiveChannel 인터페이스 두개를 구현한 인터페이스

채널의 진입점을 제한하기 위해 ReceiveChannel과 SendChannel 중 하나만 노출시킬 수도 있다.

 

send 함수와 receive 함수

  • send 함수와 receive 함수는 모두 중단 함수다.
  • receive 함수를 호출했는데 채널에 원소가 없다면, 코루틴은 원소가 들어올 때 까지 중단된다. 도서관에 비유한다면, 누군가 책을 빌리러 갔는데 책이 없는 경우, 다른 사람이 책을 반납할 때까지 기다리는 것이다.
  • send 함수를 호출했는데 채널의 용량이 다 찼다면, 코루틴은 채널에 빈 공간이 생길 때까지 중단된다. 도서관에 비유한다면, 누군가 책을 반납하러 갔는데 책장이 가득 찬 경우, 다른 사람이 책을 빌려 공간이 생길 때까지 기다리는 것이다.

 

trySend 함수와 tryReceive 함수

  • 중단 함수가 아닌 일반 함수로 원소를 보내거나 받아야 한다면 trySend 함수와 tryReceive 함수를 사용하면 된다.
  • 두 함수 모두 연산 결과를 담고 있는 ChannelResult를 즉시 반환한다.
  • trySend 함수와 tryReceive 함수는 용량이 제한적인 채널에서만 사용할 수 있다. 버퍼가 없는 (용량이 무제한인) 랑데뷰 채널에서는 작동하지 않는다.

 

채널은 송신자와 수신자의 수에 제한이 없다.

하지만 위 그림의 첫 번째 경우처럼 채널의 양쪽 끝에 각각 하나의 코루틴만 있는 것이 가장 일반적이다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            delay(1000)
            println("Producing next one")
            channel.send(index * 2)
        }
    }
    
    launch {
        repeat(5) {
            val received = channel.receive()
            println(received)
        }
    }
}
(1초 후)
Producing next one
0
(1초 후)
Producing next one
2
(1초 후)
Producing next one
4
(1초 후)
Producing next one
6
(1초 후)
Producing next one
8

위 코드는 채널을 나타내는 가장 간단한 예시이다.

각기 다른 코루틴에 생성자(송신자)와 소비자(수신자)가 있다. 생성자는 원소를 보내고 소비자는 원소를 받는다.

 

하지만 위 코드는 좋은 방식이 아니다.

수신자는 송신자가 얼마나 많은 원소를 보내는지 알아야 하기 때문이다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            channel.send(index * 2)
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println(element)
        }
        // channel.consumeEach { element ->
        //     println(element)
        // }
    }
}

보통 수신자 원소의 개수를 알기 어렵기 때문에, 송신자 원소를 보내는 만큼 수신자가 기다리는 방식이 선호된다.

채널이 닫힐 때까지 원소를 받기 위해 for 루프나 consumeEach 함수를 사용한다.

consumeEach 함수도 for 루프를 사용하지만, 모든 원소를 받고 채널이 닫힌 뒤 채널을 취소한다는 점이 다르다.

 

위 코드의 문제점은 (특히 예외가 발생했을 때) 채널을 닫는 걸 잊기 쉽다는 것이다.

예외로 인해 송신자가 원소를 보내는 걸 중단하면, 수신자는 원소를 영원히 기다린다.

 

fun CoroutineScope.produceNumbers(
    max: Int
): ReceiveChannel<Int> = produce {
    var x = 0
    while (x < 5) send(x++)
}

produce 함수를 사용하는 것이 좀더 편리하다. produce 함수는 ReceiveChannel을 반환하는 코루틴 빌더이다.

위 함수는 0부터 max까지의 양수를 가진 채널을 생성한다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = produce {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            send(index * 2)
        }
    }

    for (element in channel) {
        println(element)
    }
}

produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이 (끝나거나, 중단되거나, 취소되거나) 항상 채널을 닫는다.

즉, close 함수를 반드시 호출한다.

produce 빌더는 채널을 만들기 위해 가장 선호되는 방법이며, 안전하고 편리하다.

 

 

채널 타입

채널의 용량에 따라 네 가지로 구분할 수 있다.

  • 무제한(Unlimited)
    • 용량이 Channel.UNLIMITED로 설정된 채널이다.
    • 용량 제한이 없다.
    • send 함수가 중단되지 않는다.
  • 버퍼(Buffered)
    • 용량이 특정 크기 또는 Channel.BUFFERED로 설정된 채널이다.
    • Channel.BUFFERED의 기본값은 64이다. JVM의 defaultBuffer를 설정하여 변경할 수 있다.
  • 랑데뷰(Rendezvous)
    • 용량이 0 또는 Channel.RENDEZVOUS로 설정된 채널이다.
    • 송신자와 수신자가 만날 때만 원소를 교환한다. (책장이 아닌 책을 교환하는 장소와 유사)
  • 융합(Conflated)
    • 용량이 1 또는 Channel.CONFLATED로 설정된 채널이다.
    • 원소를 보내는 것보다 받는 것이 더 느린 경우, 새로운 원소가 이전 원소를 대체한다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.UNLIMITED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.6초 후)
0
(1초 후)
2
(1초 후)
4
(1초 후)
6
(1초 후)
8
(1초 후)

Channel 함수에 직접 용량을 설정하거나, produce 함수의 인자에 채널의 용량을 전달할 수도 있다.

용량이 무제한이면 송신자는 모든 원소를 중단하지 않고 바로 보낸다. 수신자는 원소를 하나씩 받는다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = 3) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.8초 후)
0
Sent
(1초 후)
2
Sent
(1초 후)
4
(1초 후)
6
(1초 후)
8
(1초 후)

정해진 크기의 용량을 가지고 있다면 버퍼가 가득 찰 때까지 원소를 보낸다.

이후 송신자는 수신자가 원소를 받기를 기다린다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = produce { // produce(capacity = Channel.RENDEZVOUS)
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
0
Sent
(1초 후)
2
Sent
(1초 후)
4
Sent
(1초 후)
6
Sent
(1초 후)
8
Sent
(1초 후)

기본 용량을 가진 랑데뷰 채널에서 송신자가 항상 수신자를 기다린다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.CONFLATED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.6초 후)
8

Channel.CONFLATED 용량을 사용하면 최근의 원소만 받을 수 있다.

새로운 원소가 이전 원소를 대체하기 때문에, 이전 원소는 유실된다.

 

 

버퍼 오버플로일 때

onBufferoverflow 파라미터를 통해 버퍼가 꽉 찼을 때의 행동을 커스텀할 수 있다.

  • SUSPEND(디폴트): 버퍼가 가득 찼을 때, send 함수가 중단된다.
  • DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 유실된다.
  • DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 유실된다.

채널 용량 중 Channel.CONFLATED는 용량을 1로 설정하고, onBufferoverflow를 DROP_OLDEST로 설정한 것이다.

 

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>(
        capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    launch {
        repeat(5) { index ->
            channel.send(index * 2)
            delay(100)
            println("Sent")
        }
        channel.close()
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.1초 후)
Sent
(0.6초 후)
6
(1초 후)
8

현재는 produce 함수에서 onBufferOverflow를 설정할 수 없다.

오버플로 옵션을 변경하려면 Channel 함수를 사용해야 한다.

 

 

전달되지 않은 원소 핸들러

val channel = Channel<Resource>(capacity) { resource ->
    resource.close()
}
//val channel = Channel<Resource>(
//    capacity,
//    onUndeliveredElement = { resource ->
//        resource.close()
//    }
//)

// 생성자
val resourceToSend = openResource()
channel.send(resourceToSend)

// 소비자
val resourceReceived = channel.receive()
try {
    // 수신한 자원으로 작업
} finally {
    resourceReceived.close()
}

Channel 함수의 파라미터 중에는 onUndeliveredElement가 있다.

onUndeliveredElement에 전달된 람다는 원소가 어떠한 이유로 처리되지 않을 때 호출된다.

대부분 채널이 닫히거나 취소되었을 경우지만, send, receive, receiveOrNull, hasNext 함수에서 예외가 발생하는 경우도 있다.

주로 채널에서 보낸 자원을 닫을 때 사용한다.

 

 

팬아웃(Fan-out)

여러 개의 수신자 코루틴이 하나의 송신자 채널로부터 원소를 받을 수도 있다. 이를 팬아웃이라 한다.

이때 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 한다.

(consumeEach는 여러 개의 코루틴이 사용하기에는 안전하지 않다.)

 

fun CoroutineScope.produceNumbers() = produce {
    repeat(10) {
        delay(100)
        send(it)
    }
}

fun CoroutineScope.launchProcessor(
    id: Int,
    channel: ReceiveChannel<Int>
) = launch {
    for (msg in channel) {
        println("#$id received $msg")
    }
}

suspend fun main(): Unit = coroutineScope {
    val channel = produceNumbers()
    repeat(3) { id ->
        delay(10)
        launchProcessor(id, channel)
    }
}
#0 received 0
#1 received 1
#2 received 2
#0 received 3
#1 received 4
#2 received 5
#0 received 6
...

채널은 원소를 기다리는 코루틴들을 FIFO 큐로 가지고 있다. 따라서 수신자 코루틴 간에 원소가 공평하게 배분된다.

위 예제에서도 코루틴이 순차적으로 원소를 받는다는 것을 확인할 수 있다. (0, 1, 2, 0, 1, 2...)

 

 

팬인(Fan-in)

여러 개의 송신자 코루틴이 하나의 수신자 채널로 원소를 전송할 수 있다. 이를 팬인이라 한다.

 

suspend fun sendString(
    channel: SendChannel<String>,
    text: String,
    time: Long
) {
    while (true) {
        delay(time)
        channel.send(text)
    }
}

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(50) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}
(200ms 후)
foo
(200ms 후)
foo
(100ms 후)
BAR!
(100ms 후)
foo
(200 밀리초 후)
... 

위 코드에서는 두 개의 코루틴이 같은 채널로 원소를 보내고 있다.

 

fun <T> CoroutineScope.fanIn(
    channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
    for (channel in channels) {
        launch {
            for (elem in channel) {
                send(elem)
            }
        }
    }
}

다수의 채널을 하나의 채널로 합쳐야 할 경우가 있다.

이런 경우 fanIn 함수를 사용할 수 있다. fanIn 함수는 produce 함수로 여러 개의 채널을 합친다.

 

 

파이프라인

// 1부터 3까지의 수를 가진 채널
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
    produce {
        repeat(3) { num ->
            send(num + 1)
        }
    }

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
    produce {
        for (num in numbers) {
            send(num * num)
        }
    }

suspend fun main() = coroutineScope {
    val numbers = numbers()
    val squared = square(numbers)
    for (num in squared) {
        println(num)
    }
}
1
4
9

한 채널에서 받은 원소를 다른 채널로 전송하는 경우가 있다. 이를 파이프라인이라 한다.

 

 

통신의 기본 형태로서의 채널

채널은 서로 다른 코루틴이 통신할 때 유용하다.

공유 상태로 인한 문제가 일어나지 않고, 공평하다.

 

여러 바리스타가 커피를 만드는 상황을 떠올려 보자.

각각의 바리스타는 서로 독립적으로 작업을 수행하는 코루틴이라 할 수 있다.

커피의 종류마다 만드는 시간이 다르지만, 주문은 순서대로 처리하고 싶다.

 

suspend fun CoroutineScope.serveOrders(
    orders: ReceiveChannel<Order>,
    baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
    for (order in orders) {
        val coffee = prepareCoffee(order.type)
        send(
            CoffeeResult(
                coffee = coffee,
                customer = order.customer,
                baristaName = baristaName
            )
        )
    }
}

이를 해결하기 위한 가장 쉬운 방법은 채널로 주문을 받고, 커피를 만들어서 다른 채널로 보내는 것이다.

각각의 바리스타는 produce 함수를 사용해 정의한다.

 

val coffeeResults = fanIn(
    serverders(ordersChannel, "Alex"), 
    serveOrders(ordersChannel, "Bob"), 
    serveOrders(ordersChannel, "Celine"),
)

파이프라인을 설정하고, 이전에 정의한 fanIn 함수를 사용해 각 바리스타들의 결과를 하나로 합친다.

 

 

실제 사용 예

채널을 사용하는 전형적인 예는 한 쪽에서 데이터가 생성되고 다른 쪽에서 데이터를 처리하는 것이다.

예시로는 사용자의 클릭에 반응하는 경우, 서버로부터 새로운 알림이 오는 경우, 시간이 흐르면서 검색 결과를 업데이트하는 경우가 있다.

가장 좋은 예시로는 여러 개의 항공사로부터 가장 좋은 항공편을 찾는 스카이스캐너이다.

스카이스캐너는 더 많은 항공사들이 데이터를 줄 수록 더 많은 항공편을 검색 결과로 보여준다.

 

하지만 보통 채널과 플로우가 합쳐진 channelFlow와 callbackFlow를 사용하는 것이 더 좋다.

channelFlow와 callbackFlow에 대해서는 추후 설명한다.

 

순수한 형태의 채널은 좀더 복잡한 상황에서 유용하다.

아마존 같은 온라인 쇼핑몰을 운영한다고 생각해 보자.

쇼핑몰에서는 수많은 판매자들이 상품 정보를 변경하는 것을 감지해야 한다.

 

가장 쉽게 생각할 수 있는 방법은 판매자가 정보를 변경할 때마다 갱신해야 할 상품 리스트를 찾고, 하나씩 갱신하는 것이다.

모든 것을 하나의 프로세스로 오랫동안 처리하는 건 좋지 않다. 그 이유는 아래와 같다.

  • 서버 내부에서 오류가 발생하거나 서버가 재개되면 어디서 문제가 발생했는지 찾을 수 없다.
  • 십만 개가 넘는 상품을 제공하는 판매자가 오랫동안 서버를 붙들고 있으면, 열 개의 상품을 제공하는 판매자는 한참을 기다려야 한다.
  • 동시에 상품 정보를 변경하는 서버에 너무 많은 네트워크 요청을 보내면 안 된다.

 

채널 간 파이프라인을 설정하여 해결할 수 있다.

첫 번째 채널은 처리해야 할 판매자를 가지고 있고, 두 번째 채널은 갱신해야 할 상품을 가지고 있다. 두 채널 모두 버퍼를 가지고 있다.

이 방식의 장점은 아래와 같다.

  • 채널의 버퍼가 존재하기 때문에, 서버는 동시에 갱신해야 할 상품의 수를 조절할 수 있다.
  • 중복된 상품을 제거하는 등 중간 과정을 추가하는 것이 쉽다.
  • 각 채널의 수신자 코루틴 수를 정의해서, 외부 서비스에 얼마나 많은 요청을 보낼지 결정할 수 있다.
  • 채널과 관련된 파라미터를 조정하여 서버를 자유롭게 커스텀할 수 있다.
  • 영속성(서버 재개 시), 원소의 유일성(판매자가 이전 요청이 처리되기 전에 또 다른 변경을 요청) 등을 쉽게 추가할 수 있다.

 

suspend fun handleOfferUpdates() = coroutineScope {
    val sellerChannel = listenOnSellerChanges()

    val offerToUpdateChannel = produce(capacity = UNLIMITED) {
        repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {
            launch {
                for (seller in sellerChannel) {
                    val offers = offerService.requestOffers(seller.id)
                    offers.forEach { send(it) }
                }
            }
        }
    }

    repeat(NUMBER_OF_CONCURRENT_UPDATE_SENDERS) {
        launch {
            for (offer in offerToUpdateChannel) {
                sendOfferUpdate(offer)
            }
        }
    }
}

이를 간략화한 구현은 위와 같다.

sellerChannel에서 판매자들의 상품 변경 요청을 감지한다.

offerToUpdateChannel에서는 sellerChannel의 원소를 받아 다시 다른 채널로 보낸다.

그 아래에서 실제 상품 갱신 요청을 보낸다.

 

 

요약

  • 채널은 코루틴끼리 통신할 때 사용하는 기본적인 방식이다.
  • 송신자와 수신자 수에 제한이 없고, 채널을 통해 보내진 데이터는 단 한 번만 받을 수 있다.
  • 보통 produce 빌더를 사용해 채널을 생성한다.
  • 채널은 특정 작업의 코루틴의 수를 조절하는 파이프라인을 설정할 때 사용할 수 있다.
  • 최근에는 플로우를 채널과 연결해서 사용하는 경우가 많다.

 

 

 

출처

https://www.yes24.com/product/goods/123034354

728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/02   »
1
2 3 4 5 6 7 8
9 10 11 12 13 14 15
16 17 18 19 20 21 22
23 24 25 26 27 28
글 보관함