티스토리 뷰
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
fun downloaded(): List<User> = users.toList()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users.add(newUser)
}
}
시작하기 전에 위 코드를 살펴보자.
이 클래스는 두 개 이상의 스레드가 동시에 사용할 경우에 대한 대비가 되어있지 않다.
fetchUser()에서 users를 변경하고 있기 때문에, users는 공유 상태에 해당한다.
따라서 users는 보호되어야 한다.
두 개 이상의 스레드에서 동시에 users를 변경하면 충돌이 일어날 수 있기 때문이다.
class FakeNetworkService : Networkservice {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // 〜998242
}
1,000,000이 출력되어야 하지만, 1,000,000개의 코루틴에서 동시에 fetchUser()를 호출하고 있기 때문에 1,000,000보다 작은 숫자를 출력한다.
var counter = 0
fun main() = runBlocking {
massiveRun {
counter++
}
println(counter) // 〜567231
}
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
좀 더 간단한 예시는 여러 스레드에서 하나의 정수를 1씩 증가시키는 경우다.
Dispatchers.Default를 사용하는 1,000개의 코루틴에서 각각 1,000번의 연산을 호출한다.
연산이 끝난 뒤 숫자는 1,000,000이 되어야 하지만, 스레드 충돌이 발생해 1,000,000보다 작은 값이 출력된다.
스레드 간 충돌의 예시를 들어보자.
- counter의 시작값은 0이다.
- 첫 번째 스레드가 현재 값인 0을 읽고, 컨텍스트 스위칭 되어 두 번째 스레드에서도 0을 읽는다.
- 두 번째 스레드는 1로 증가시킨 뒤 counter에 저장한다.
- 첫 번째 스레드로 다시 스위칭되어 1로 증가시킨 뒤 counter에 저장한다.
- 두개의 스레드가 연산을 했기 때문에 결과값은 2가 되어야하지만 실제로는 1이 된다.
동기화 블로킹
var counter = 0
fun main() = runBlocking {
val lock = Any()
massiveRun {
synchronized(lock) {
counter++
}
}
println("Counter = $counter") // 1000000
}
전통적인 방식인 자바의 synchronized 블록이나 동기화 컬렉션을 사용해 해결할 수 있다.
이 방법의 문제점은 아래와 같다.
- synchronized 블록 내부에서 중단 함수를 사용할 수 없다.
- synchronized 블록 내부에서 코루틴이 자기 차례를 기다릴 때 스레드를 블로킹한다.
코루틴에서는 블로킹 대신 중단하는 방식으로 동기화해야 한다.
원자성
var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}") // 1000000
}
자바에서 제공하는 다양한 원자값으로도 해결할 수 있다. (AtomicInteger, AtomicLong, AtomicBoolean 등)
이러한 원자값 연산은 락 없이 로우 레벨로 구현되었기 때문에 효율적이고 사용하기 쉬우며, thread-safe하다.
var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.set(counter.get() + 1)
}
println("Counter = ${counter.get()}") // ~430467
}
하지만 원자성은 사용성이 제한적이기 때문에 조심해서 다뤄야 한다.
하나의 연산 각각은 원자성을 가지고 있지만, 연산을 조합해서 사용할 때는 원자성이 보장되지 않는다.
class UserDownloader(
private val api: NetworkService,
) {
private val users = AtomicReference(listOf<User>())
fun downloaded(): List<User> = users.get()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchuser(id)
users.getAndUpdate { it + newUser }
}
}
UserDownloader를 안전하게 만들려면 User 리스트를 AtomicReference로 감쌀 수 있다.
원자성은 하나의 Primitive Type/Wrapper Class의 동기화를 보장하기 위해 사용한다.
하지만 연산이 좀더 복잡한 경우에는 원자성 대신 다른 방법을 사용해야 한다.
싱글 스레드로 제한된 디스패처
val dispatcher = Dispatchers.IO
.limitedParallelism(1)
var counter = 0
fun main() = runBlocking {
massiveRun {
withContext(dispatcher) {
counter++
}
}
println(counter) // 1000000
}
공유 상태의 문제를 해결하는 가장 쉬운 방법은 싱글 스레드 디스패처를 사용하는 것이다.
디스패처를 사용하는 방법에는 두 가지가 있다.
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) = withContext(dispatcher) {
val newUser = api.fetchUser(id)
users += newUser
}
}
첫 번째는 코스 그레인드 스레드 한정(coarse-grained thread confinement)이다.
전체 함수를 싱글 스레드 디스패처의 withContext로 래핑한다.
이 방법은 사용하기 쉬우며 충돌을 방지할 수 있지만, 함수 전체가 싱글 스레드에서 실행되므로 멀티 스레드의 이점을 누리지 못한다.
위 코드에서 api.fetchUser(id)는 공유 자원을 수정하는 연산이 아니기 때문에, 여러 개의 스레드에서 병렬로 시작할 수 있다.
하지만 함수 전체가 싱글 스레드 디스패처에서 실행된다.
만약 api.fetchUser(id) 내부에서 스레드를 블로킹을 하거나 CPU 집약적인 연산을 수행한다면 함수 실행이 느려질 것이다.
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
withContext(dispatcher) {
users += newUser
}
}
}
두 번째는 파인 그레인드 스레드 한정(fine-grained thread confinement)이다.
상태를 변경하는 구문들만 withContext로 감싼다.
이 방법은 좀 더 번거롭지만 멀티 스레드로 접근해도 되는 부분이 스레드를 블로킹을 하거나 CPU 집약적인 연산을 수행하는 경우 더 빠르다.
대부분의 디스패처가 하나의 스레드 풀을 공유하기 때문에 싱글 스레드 디스패처를 사용하는 건 쉽고 효율적이다.
뮤텍스
suspend fun main() = coroutineScope {
repeat(5) {
launch {
delayAndPrint()
}
}
}
val mutex = Mutex()
suspend fun delayAndPrint() {
mutex.lock()
delay(1000)
println("Done")
mutex.unlock()
}
(1초 후)
Done
(1초 후)
Done
(1초 후)
Done
(1초 후)
Done
(1초 후)
Done
가장 자주 사용되는 동기화 방식은 뮤텍스다.
뮤텍스의 가장 중요한 기능은 lock이다.
뮤텍스를 "단 하나의 열쇠가 있는 방"이라고 생각하면 쉽다.
여러 코루틴끼리 뮤텍스를 통해 동기화하는 과정은 아래와 같다.
- 코루틴이 lock()을 호출
- 열쇠를 가지고 작업을 끝까지 수행한다. 이때 작업이 중간에 중단되지 않는다.
- 다른 코루틴이 lock()을 호출
- 1번 코루틴이 unlock()을 호출할 때까지 중단한다.
- 또 다른 코루틴이 lock() 함수를 호출
- 작업을 중단하고 2번 코루틴 다음 순서로 대기 큐에 들어간다.
- 1번 코루틴이 unlock() 함수를 호출
- 2번 코루틴이 재개되고 lock() 함수를 통과한다. (= lock() 함수 뒤에 있는 코드를 실행한다)
- 따라서 단 하나의 코루틴만이 lock()과 unlock() 사이에 있는 코드를 실행할 수 있다.
lock()과 unlock()을 직접 호출하는 것은 위험하다.
lock()과 unlock() 사이에 있는 동기화 코드에서 예외가 발생하거나, unlock() 전에 early return 된다면 unlock()이 호출되지 않는다.
따라서 다른 코루틴은 lock()을 통과할 수 없다.
이 문제를 데드락이라고 한다. (누군가 열쇠를 돌려주지 않고 나간다면 아무도 방에 들어갈 수 없는 것과 동일)
val mutex = Mutex()
var counter = 0
suspend fun main() = runBlocking {
massiveRun {
mutex.withLock {
counter++
}
}
print(counter) // 1000000
}
suspend fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
lock(owner)
return try {
action()
} finally {
unlock(owner)
}
}
대신 lock()으로 시작해 finally 블록에서 unlock()을 호출하는 withLock() 함수를 사용한다.
withLock()을 사용하면 블록 내에서 어떤 예외가 발생하더라도 항상 unlock()을 호출한다.
뮤텍스는 synchronized 블록과 달리 스레드를 블로킹하지 않고, 코루틴을 중단시킨다. 따라서 좀더 안전하고 가볍다.
뮤텍스는 싱글 스레드 디스패처보다 뮤텍스가 더 가볍고, 성능이 좋다.
suspend fun main() {
val mutex = Mutex()
println("Started")
mutex.withLock {
mutex.withLock {
println("Will never be printed")
}
}
}
Started
(영원히 실행)
하지만 뮤텍스는 적절히 사용하기가 어렵다.
뮤텍스를 사용하며 가장 주의해야 할 점은 코루틴이 락을 두 번 통과할 수 없다는 것이다. (열쇠가 방 안에 있으면 다른 방을 열 수 없다)
위 코드를 실행하면 데드락에 빠지게 되어 영원히 블로킹 상태에서 기다린다.
class MessagesRepository {
private val messages = mutableListOf<String>()
private val mutex = Mutex()
suspend fun add(message: String) = mutex.withLock {
delay(1000) // 네트워크 호출이라 가정
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // 〜5120
}
또 다른 문제점은 뮤텍스 내부에서 코루틴이 중단되었을 때 다른 코루틴으로 스위칭되지 않고 계속 기다린다는 것이다.
코루틴이 중단되어도 중간에 락을 풀 수 없기 때문이다.
위 코드에서는 withLock 블럭 내부에서 delay()가 호출되어 5초가 걸린다.
class MessagesRepository {
private val messages = mutableListOf<String>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun add(message: String) =
withContext(dispatcher) {
delay(1000) // 네트워크 호출이라 가정
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // 1058
}
싱글 스레드 디스패처를 사용하면 위 문제가 발생하지 않는다.
delay나 네트워크 호출이 코루틴을 중단시키면 스레드가 블로킹되지 않고 다른 코루틴을 실행한다.
함수 전체를 뮤텍스로 감싸는 건 지양해야 한다. (코스 그레인드 방식)
뮤텍스에서는 락을 두 번 걸지 않고 내부에서 중단 함수를 호출하지 않도록 신경써야 한다.
class MongoUserRepository(
// ...
) : UserRepository {
private val mutex = Mutex()
override suspend fun updateUser(
userId: String,
userUpdate: UserUpdate,
): Unit = mutex.withLock {
val currentUser = getUser(userId) // 데드락!
deleteUser(userId) // 데드락!
addUser(currentUser.updated(userUpdate)) // 데드락!
}
override suspend fun getUser(
userId: String
): User = mutex.withLock {
// ...
}
override suspend fun deleteUser(
userId: String
): Unit = mutex.withLock {
// ...
}
override suspend fun addUser(
user: User
): User = mutex.withLock {
// ...
}
}
위 코드처럼 공유 상태를 변경하는 곳에서만 동기화하는 파인 그레인드 방식이 도움이 될 수 있다.
하지만 위 코드 같은 경우 필자는 싱글 스레드 디스패처를 더 선호한다.
세마포어
suspend fun main() = coroutineScope {
val semaphore = Semaphore(2)
repeat(5) {
launch {
semaphore.withPermit {
delay(1000)
print(it)
}
}
}
}
01
(1초 후)
23
(1초 후)
4
세마포어는 뮤텍스와 비슷하지만 두 개 이상의 코루틴이 접근할 수 있고, 사용법이 다르다.
뮤텍스는 하나의 코루틴만 접근할 수 있기 때문에 lock(), unlock(), withLock() 함수를 가지고 있다.
세마포어는 여러 코루틴이 접근할 수 있기 때문에 acquire(), release(), withPermit() 함수를 가지고 있다.
class LimitedNetworkUserRepository(
private val api: UserApi
) {
// 동시 요청을 10개로 제한합니다.
private val semaphore = Semaphore(10)
suspend fun requestUser(userId: String) =
semaphore.withPermit {
api.requestUser(userId)
}
}
세마포어는 여러 코루틴이 접근할 수 있어 공유 상태를 동시에 접근하는 경우의 문제를 해결할 수는 없다.
하지만 동시 요청을 처리하는 수를 제한할 때 사용할 수 있다.
요약
코루틴을 사용할 때 동기화하는 방식은 다양하다.
- 싱글 스레드 디스패처
- 가장 많이 사용하는 방식이다.
- 파인 그레인드 스레드 한정은 동기화가 필요한 곳만 감싼다.
- 코스 그레인드 스레드 한정은 전체 함수를 감싼다.
- 코스 그레인드 스레드 한정이 더 쉽지만 성능은 떨어진다.
- 원자값
- 뮤텍스
'kotlin > coroutines' 카테고리의 다른 글
2.10 코틀린 코루틴 라이브러리 - 코틀린 코루틴 테스트하기 (2) (1) | 2025.01.21 |
---|---|
2.10 코틀린 코루틴 라이브러리 - 코틀린 코루틴 테스트하기 (1) (0) | 2025.01.19 |
2.8 코틀린 코루틴 라이브러리 - 코루틴 스코프 만들기 (0) | 2025.01.17 |
2.7 코틀린 코루틴 라이브러리 - 디스패처 (2) | 2025.01.16 |
2.6 코틀린 코루틴 라이브러리 - 코루틴 스코프 함수 (0) | 2025.01.14 |