728x90
코틀린 코루틴을 요약한 내용입니다.
- 코루틴끼리의 통신을 위한 기본적인 방법으로 채널 API가 추가
- 채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한번만 받을 수 있음
- Channel은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스
- SendChannel은 원소를 보내거나 채널을 닫는 용도
- ReceiverChannel은 원소를 받을 때 사용
interface SendChannel<in E> { suspend fun send(element: E) fun close(): Boolean } interface ReceiverChannel<out E> { suspend fun receive(): E fun cancel(cause: CancellationException? = null) } interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
- send와 receiver 모두 중단 함수라는 것을 확인
- 원소를 보내고 받는 함수가 중단 함수인 것이 필수적인 특징
- receiver를 호출 했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때 까지 중단
- send는 채널의 용량이 다 찼을 때 중단
- 채널의 가장 간단한 예
- 간단한 예는 불완전한 방법
- 수신자는 얼마나 많은 원소를 보내는지 알아야 함
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> delay(1000) println("Producing next one") channel.send(index * 2) } } launch { repeat(5) { val received = channel.receive() println(received) } } }
- 코루틴 빌더인 produce 함수를 사용하는 것이 좀 더 편리
- produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이 채널을 닫습니다.
- close를 반드시 호출
- produce는 채널을 안전하고 편리한 방법
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> println("Producing next one") delay(1000) channel.send(index * 2) } channel.close() } launch { for (element in channel) { println(element) } } }
채널 타입
- 설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있음
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 채널
- send가 중단되지 않음
suspend fun main(): Unit = coroutinScope { val channel = produce(capacity = Channel.UNLIMITED) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } }
- 버퍼(Buffered): 특정 용량 크기 또는 Channel.BUFFERED로 설정된 채널
- suspend fun main(): Unit = coroutinScope { val channel = produce(capacity = 3) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } Sent Sent Sent Consuming next one : 0 Sent Consuming next one : 2 Sent Consuming next one : 4 Consuming next one : 6 Consuming next one : 8
- 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDZVOUS인 채널
- 송신자와 수신자가 만날 때만 원소를 교환
suspend fun main(): Unit = coroutinScope { val channel = produce(capacity = Channel.RENDEZVOUS) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } }
- 융합(Conflated): 버퍼 크기가 1인 채널
- 새로운 원소가 이전 원소를 대체
suspend fun main(): Unit = coroutinScope { val channel = produce(capacity = Channel.CONFLATED) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } }
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 채널
버퍼 오버플로일 때
- 채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때 행동을 정의 할 수 있음
- SUSPEND(기본 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단
- DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거
- DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거
- onBufferOverflow를 DROP_OLDEST로 설정
- suspend fun main(): Unit = coroutinScope { val channel = Channel<Int> ( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST ) launch { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } channel.close() } delay(1000) for (element in channel) { println(element) delay(1000) } }
전달되지 않은 원소 핸들러
- Channel 함수에서 반드시 알아야 할 파라미터는 onUndeliveredElement
- 원소가 어떠한 이유로 처리되지 않았을 때 호출
- 대부분 채널이 닫히거나 취소되었음을 의미
val channel = Channel<Resource>(capacity) { resource -> resource.close() } val channel = Channel<Resource>( capacity, onUndeliveredElement = { resource -> resource.close() } ) val resourceToSend = openResource() channel.send(resourceToSend) val resourceReceived = channel.receive() try { ... } finally { resourceReceived.close() }
팬아웃
- 여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있음
- 원소를 적절하게 처리하려면 반드시 for 루프 사용
- consumeEach는 여러 개의 코루틴이 사용하기에는 안전하지 않음
fun CoroutinScope.produceNumbers() = produce { repeat(10) { delay(100) send(it) } } fun CoroutineScope.launchProcessor( id: Int, channel: ReciveChannel<Int> ) = launch { for (msg in channel) { println("$id received $msg") } } suspend fun main(): Unit = coroutineScope { val channel = produceNumbers() repeat(3) { id -> delay(10) launchProcessor(id, channel) } }
팬인
- 여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있음
- suspend fun sendString( channel: SendChannel<String>, text: String, time: Long ) { while(true) { delay(time) channel.send(text) } } fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(50) { println(channel.receive()) } coroutinContext.cancelChildren() }
- 다수의 채널을 하나의 채널로 합쳐야 할 경우
- produce 함수로 여러 개의 채널을 합치는 fanIn 함수 사용
fun <T> CoroutinScope.fanIn( channels: List<ReceiveChannel<T>> ): ReceiveChannel<T> = produce { for (channel in channels) { launch { for (elem in channel) { send(elem) } } } }
파이프라인
- 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우
- fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce { repeat(3) { num -> send(num + 1) } } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce { for (num in numbers) { send(num * num) } } suspend fun main() = coroutineScope { val numbers = numbers() val squared = square(numbers) for (num in squared) { println(num) } } // 1 // 4 // 9
통신의 기본 형태로서의 채널
- 채널은 서로 다른 코루틴이 통신할 때 유용
- suspend fun CoroutineScope.serveOrders( orders: receiveChannel<Order>, baristaName: String ): ReceiveChannel<CoffeeResult> = produce { for (order in orders) { val coffee = prepareCoffee(order.type) send(CoffeeResult( coffee = coffee, customer = order.customer, baristaName = baristaName ) } } val coffeResults = fanIn( serveOrders(ordersChannel, "Alex"), serveOrders(ordersChannel, "Bob"), serveOrders(ordersChannel, "Celine") )
실제 사용 예
- 채널을 사용하는 전형적인 예는 데이터가 한 쪽에서 생성되고 다른 쪽에서 데이터를 처리하는 것
- 사용자의 클릭에 반응하는 경우
- 서버로부터 새로운 알림이 오는 경우
- 시간이 흐르면서 검색 결과를 업데이트하는 경우
- 대부분의 경우 채널과 플로우가 합쳐진 channelFlow나 callbackFlow를 사용하는 것이 더 좋음
- 21장 플로우 만들기 참고
- 순수한 형태의 채널은 좀 더 복잡한 처리를 요하는 경우에 유용
- 하나의 프로세스로 오랫동안 처리하는 것 좋은 생각이 아님
- 내부적으로 예외가 발생하거나 서버가 재개 되면 어디서 멈췄는지 알기 어려움
- 대규모 데이터를 가지고 있는 사용자가 오랫동안 서버를 붙들고 있으면 나머지 사용자는 한참을 기다려야 함
- 파이프라인을 설정하는 것이 해결책이 될 수 있음
- 첫 번째 채널은 처리해야 할 사용자를 가지고 있음
- 두 번째 채널은 갱신해야할 데이터를 가지고 있음
- 채널 모두 버퍼를 가지고 있기 때문에 버퍼 이상으로 데이터를 받는 것을 방지
suspend fun handleOfferUpdates() = coroutineScope { val sellerChannel = listenOnSellerChanges() val offerToUpdateChannel = produce(capacity = UNLIMITED) { repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) { launch { for (seller in sellerChannel) { val offers = offerService.requestOffers(seller.id) offers.forEach { send(it) } } } } } repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) { launch { for (offer in offerToUpdateChannel) { sendOfferUpdate(offer) } } } }
요약
- 채널은 코루틴끼리 통신할 때 사용하는 강력한 기본 도구
- 송신자와 수신자의 수에 제한이 없음
- 채널을 통해 보내진 데이터는 단 한 번 받는 것이 보장
- produce 빌더를 사용해 채널을 생성
- 채널은 특정 작업에 사용되는 코루틴의 수를 조절하는 파이프라인을 설정할 때 유용
- 플로우를 채널과 연결해서 사용하는 경우가 많음
728x90
'코틀린 스터디' 카테고리의 다른 글
핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.01.04 |
---|---|
셀렉트 (0) | 2024.01.04 |
코루틴 스코프 만들기 (1) | 2023.11.24 |
코루틴 기반 동시성 프로그래밍 (0) | 2023.11.24 |
코틀린 - 예외처리, Type System, 컬렉션 (0) | 2023.11.10 |