티스토리 뷰
728x90
동시성 통신
- 채널
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(3)
launch {
for (n in 1..streamSize) {
delay(Random.nextLong(100))
println("Sending: $n")
channel.send(n)
}
}
launch {
for (i in 1..streamSize) {
delay(Random.nextLong(100))
val n = channel.receive()
println("Receiving: $n")
}
}
}
}
- Thread-safe하게 임의의 데이터 스트림을 코루틴 사이에 공유할 수 있게 해준다.
- 채널의 용량을 지정하는 정수값을 인자로 받는 Channel() 함수를 통해 채널을 생성할 수 있다.
- 데이터를 스트림에 넣을 땐 send(), 스트림에서 받을 땐 receive()를 호출한다.
- 버퍼가 꽉 차면 다른 코루틴이 값을 수신할 때까지 send() 호출이 일시 중단된다.
- 버퍼가 비어있으면 다른 코루틴이 값을 송신할 때까지 receive() 호출이 일시 중단된다.
Channel()의 인자
Channel.UNLIMITED ( = Int.MAX_VALUE ) |
|
Channel.RENDEZVOUS ( = 0 ) |
|
Channel.CONFLATED ( = -1 ) |
|
0 ~ Int.MAX_VALUE 사이의 값 |
|
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(Channel.RENDEZVOUS)
launch {
for (n in 1..streamSize) {
delay(Random.nextLong(100))
println("Sending: $n")
channel.send(n)
}
}
launch {
for (i in 1..streamSize) {
delay(Random.nextLong(100))
val n = channel.receive()
println("Receiving: $n")
}
}
}
}
실행 결과
Sending: 1
Receiving: 1
Sending: 4
Receiving: 4
Sending: 9
Receiving: 9
Sending: 16
Receiving: 16
Sending: 25
Receiving: 25
|
- 랑데부 채널(버퍼 크기가 0)은 생산자와 소비자 코루틴이 교대로 Active 된다.
fun main() {
runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (n in 1..5) {
delay(100)
println("Sending: $n")
channel.send(n)
}
channel.close()
}
launch {
for (n in channel) { // close() 전 까지 무한 루프
println("Receiving: $n")
delay(200)
}
}
}
}
- conflated 채널은 모든 원소를 받을 필요 없고 생산자가 보낸 값을 일부만 받아도 되는 경우 사용한다.
- close() 함수를 호출해 채널이 닫혀 데이터를 더 이상 보내지 않는다는 것을 알린다.
- 채널이 닫힌 후 send()를 호출하면 ClosedSendChannelException 예외가 발생한다.
- 채널이 닫힌 후 receive()를 호출하면 정상적으로 데이터를 수신하지만 그 후 ClosedReceiveChannelException 예외가 발생한다.
channel.consumeEach {
println("Receiving: $it")
delay(200)
}
- 이터레이션을 사용하지 않고 consumeEach() 함수를 통해 채널의 데이터를 얻을 수 있다.
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(2)
launch { // 생산자 1개
for (n in 1..streamSize) {
println("Sending: $n")
channel.send(n)
}
channel.close()
}
for (i in 1..3) { // 소비자 3개
launch {
for (n in channel) {
println("Receiving by consumer $i: $n")
delay(Random.nextLong(100))
}
}
}
}
}
- 채널 통신에 참여하는 생산자와 소비자가 꼭 코루틴 하나씩일 필요는 없다.
- 한 채널을 여러 코루틴이 동시에 읽을 수 있다. → fan out
- 여러 생산자 코루틴이 한 채널에 데이터를 넣고, 하나의 소비자 코루틴이 읽을 수 있다. → fan in
- 여러 생산자, 여러 소비자가 여러 채널을 공유할 수 있다.
2. 프로듀서
- produce() 함수를 통해 데이터 스트림을 생성할 수 있다.
- send() 함수를 제공하는 ProducerScope 영역을 만들어준다.
- ReceiveChannel이 반환된다.
fun main() {
runBlocking {
val channel = produce {
for (n in 1..5) {
println("Sending: $n")
send(n)
}
}
launch {
channel.consumeEach { println("Receiving: $it") }
}
}
}
- 명시적으로 close할 필요 없이 코루틴이 종료되면 produce() 빌더가 채널을 자동으로 닫아준다.
- async()/await() 방식과 동일하게, 예외를 저장하고 receive()가 호출될 때 다시 발생한다.
3. 티커
- Unit 값을 계속 발생시키고, 한 원소와 다음 원소의 발생 시점을 일정 시간동안 떨어져 있는 스트림을 생성한다.
- 버퍼의 크기가 0인 랑데부 채널이다.
- ticker() 함수를 통해 생성한다.
- delayMillis: 원소의 발생 시간 간격
- initialDelayMilis: 티커 생성 시점과 가장 첫 원소 발생 시점 사이의 간격 (default = delayMillis)
- context: 코루틴 context
- mode
- TickerMode.FIXED_PERIOD: 시간 간격을 delayMillis에 맞추기 위해 실제 지연 시간을 조정한다. (default)
- TickerMode.FIXED_DELAY: 실제 흘러간 시간과 관계 없이 delayMillis 만큼 지연 후 다음 원소를 발생시킨다.
fun main() = runBlocking {
val tickerChannel = ticker(100)
println(withTimeoutOrNull(50) { tickerChannel.receive() })
println(withTimeoutOrNull(60) { tickerChannel.receive() })
delay(150)
println(withTimeoutOrNull(1) { tickerChannel.receive() })
println(withTimeoutOrNull(60) { tickerChannel.receive() })
tickerChannel.cancel()
}
실행 결과
null
kotlin.Unit
kotlin.Unit
kotlin.Unit
|
- 150ms 동안 일시 중단되어 티커는 지연 시간을 조정한다.
실행 결과
null
kotlin.Unit
kotlin.Unit
null
|
- 티커 모드를 FIXED_RATE로 변경하면 결과가 위와 같이 바뀐다.
- 150ms 동안 일시 중단되었지만, 실제 흐른 시간은 고려하지 않고 ticker의 간격인 100ms를 다시 기다린다.
4. 액터
sealed class Action
object Inc : Action()
class Get(val response: CompletableDeferred<Int>) : Action()
fun main() : Unit = runBlocking {
val actorCounter = actor<Action> {
var counter = 0
for (msg in channel) {
when(msg) {
is Inc -> counter++
is Get -> msg.response.complete(counter)
}
}
}
val job = launch {
repeat(5000) {
actorCounter.send(Inc)
}
}
job.join()
val response = CompletableDeferred<Int>()
actorCounter.send(Get(response))
println(response.await()) // 5000
actorCounter.close()
}
- 액터를 통해 가변 상태를 Thread-safe하게 공유할 수 있다. (단일 스레드로 관리)
- actor() 함수를 통해 생성할 수 있다.
- 기본적으로 랑데부 채널을 사용한다.
- SendChannel이 반환된다.
- 액터에게 메시지를 보내려면 send()를 사용한다.
- 액테에게 메시지를 보낸 대상으로 요청 결과를 돌려줄 때 complete()를 사용한다.
5. Flow
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
foo().collect { value -> println(value) }
}
- 비동기적으로 처리될 값들의 스트림을 나타내기 위해 플로우를 사용한다.
- flow {} 빌더를 이용해 Flow 타입을 생성하고, 블록 안에는 suspend 함수들이 들어갈 수 있다.
- suspend 키워드를 사용하지 않아도 된다.
- emit()를 이용해 값을 방출한다.
- 방출된 값은 collect()를 이용해 수집된다.
fun main() = runBlocking {
val foo = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
println("collect started")
foo.collect { }
foo.collect { }
}
실행 결과
collect started
Flow started
Flow started
|
- 플로우 빌더의 본문은 collect가 호출되기 전까지는 실행되지 않는다. (cold stream)
- 플로우 타입의 변수 foo를 생성했을 때 Flow started는 출력되지 않고, collect() 호출 시에 출력된다.
- 어떤 것도 일시 중단되지 않고 즉시 foo 변수에 대입된다.
(1..3).asFlow().collect { vale -> println(value) }
- 컬렉션, 시퀀스들은 asFlow() 확장 함수를 통해 플로우로 변환할 수 있다.
- 플로우 중간 연산자
fun main() = runBlocking {
(1..3).asFlow()
.map { request ->
delay(1000)
"response $request"
}
.collect { response -> println(response) }
}
실행 결과
response 1
response 2
response 3
|
-
- map, filter, transform, take 등의 함수를 사용할 수 있다. (의미는 컬렉션 함수들과 동일)
- 블록 안에서 suspend 함수를 호출할 수 있다.
- 플로우와 동일하게 cold type으로 동작하기 때문에, 새로운 Flow를 즉시 반환한다.
- 플로우 종단 연산자
fun main() = runBlocking {
val sum = (1..10).asFlow()
.map { it + it }
.reduce { a, b -> a + b }
println(sum) // 110
}
-
- toList(), toSet() 등을 통해 컬렉션으로 변환할 수 있다.
- reduce(), fold()를 통해 하나의 값으로 변환할 수 있다.
fun main() = runBlocking {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
실행 결과
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
|
- 플로우는 순차적이기 때문에 하나의 값 각각 마지막 연산까지 수행된다.
- 플로우 context
fun foo() = flow {
println("[${Thread.currentThread().name}] Started flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking {
foo().collect { value -> println("[${Thread.currentThread().name}] Collected $value")}
}
실행 결과
[main @coroutine#1] Started flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
|
-
- flow 빌더의 본문은 항상 collect()를 호출한 코루틴의 context에서 수행된다.
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
println("[${Thread.currentThread().name}] Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
foo.collect { value -> println("[${Thread.currentThread().name}] Collected $value")}
}
실행 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
|
-
- flow 빌더 내부에서 withContext()를 통해서 context를 변경시킬 수 없다.
- Flow 타입 전체에 flowOn() 함수를 호출해서 블록 안을 실행할 context를 지정한다.
- buffer
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
val time = measureTimeMillis {
foo.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
-
- 방출하는 시간과 수집하는 시간이 오래 걸릴 경우, buffer() 함수를 통해서 동시에 방출/수집되도록 할 수 있다.
- 위 코드에서 첫 번째 값만 100ms를 기다리고, 각각 300ms씩 기다리기 때문에 총 1000ms의 시간이 걸린다.
- buffer()이 없으면, 세 숫자 각각 400ms씩 총 1200ms의 시간이 걸린다.
- conflate
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
foo.conflate()
.collect { value ->
delay(300)
println(value)
}
}
실행 결과
1
3
|
-
- conflate()를 통해 가장 최신의 값만 수집할 수 있다.
- 1을 수집하는 동안 2, 3까지 방출되므로 2는 수집되지 않고 1, 3만 수집된다.
- collectLatest
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
foo.collectLatest() { value ->
println("Collected $value")
delay(300)
println("Done $value")
}
}
실행 결과
Collected 1
Collected 2
Collected 3
Done 3
|
-
- collectLatest()를 통해 새로운 값이 방출될 때마다 수집을 취소하고 다시 다음 값의 수집을 시작할 수 있다.
- zip
fun main() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
}
실행 결과
1 -> one
2 -> two
3 -> three
|
-
- zip()을 통해 두개의 플로우의 값을 하나로 합칠 수 있다.
- 플로우 값의 개수가 다른 경우, 더 작은 플로우의 개수만큼 생성된다.
- combine
fun main() = runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
nums.combine(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
}
실행 결과
1 -> one
2 -> one
2 -> two
3 -> two
3 -> three
|
-
- zip()과 동일하게 두개의 플로우 값을 하나로 합친다.
- 하나의 플로우에서 방출이 일어날 때마다 수집된다.
- 예외
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
emit(i)
}
}
try {
foo.collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
실행 결과
1
2
Caught java.lang.IllegalStateException: Collected 2
|
-
- try/catch 블록을 사용해 예외를 처리할 수 있다.
- flow 빌더의 본문이나 플로우의 중간/종단 연산자에서 발생하는 모든 예외를 처리한다.
fun main() = runBlocking {
val foo = flow {
for (i in 1..3) {
emit(i)
}
}
.map { value ->
check(value <=1) { "Crashed on $value"}
"string $value"
}
foo.catch { e -> emit("Caught $e") }
.collect { value -> println(value) }
}
실행 결과
string 1
Caught java.lang.IllegalStateException: Crashed on 2
|
-
- catch()를 통해 예외를 처리할 수 있다.
- 업스트림에서 발생하는 예외만 처리하고, 다운스트림에서 발생하는 예외는 처리하지 않는다.
- 업스트림: catch() 함수 위의 모든 연산
- 다운스트림: catch() 함수 아래의 모든 연산
- 종료
fun main() = runBlocking {
(1..3).asFlow()
.onCompletion{ println("Done") }
.collect { value -> println(value) }
}
-
- onCompletion() 함수를 통해 종료 시 실행할 동작을 정의할 수 있다.
- try/catch의 finally 블록을 정의하는 것과 동일하다.
728x90
'app > kotlin' 카테고리의 다른 글
[kotlin/코틀린] JUnit, AssertJ 어노테이션과 함수 정리하기 (0) | 2023.10.22 |
---|---|
코틀린으로 코딩테스트 준비하기 (0) | 2023.10.18 |
[kotlin/코틀린] 코루틴 흐름 제어 (Job, 타임아웃, 디스패처, 예외 처리) (0) | 2023.07.20 |
[kotlin/코틀린] 코루틴 기초 (suspend 함수, 코루틴 빌더, 문맥, 범위) (0) | 2023.07.19 |
[kotlin/코틀린] 위임 프로퍼티 (0) | 2023.07.19 |