코틀린 스터디

채널

막이86 2023. 12. 4. 19:17
728x90

코틀린 코루틴을 요약한 내용입니다.

  • 코루틴끼리의 통신을 위한 기본적인 방법으로 채널 API가 추가
    • 채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한번만 받을 수 있음
  • Channel은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스
    • SendChannel은 원소를 보내거나 채널을 닫는 용도
    • ReceiverChannel은 원소를 받을 때 사용
    interface SendChannel<in E> {
    	suspend fun send(element: E)
    	fun close(): Boolean
    }
    
    interface ReceiverChannel<out E> {
    	suspend fun receive(): E
    	fun cancel(cause: CancellationException? = null)
    }
    
    interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
    
  • send와 receiver 모두 중단 함수라는 것을 확인
    • 원소를 보내고 받는 함수가 중단 함수인 것이 필수적인 특징
  • receiver를 호출 했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때 까지 중단
  • send는 채널의 용량이 다 찼을 때 중단
  • 채널의 가장 간단한 예
    • 간단한 예는 불완전한 방법
    • 수신자는 얼마나 많은 원소를 보내는지 알아야 함
    suspend fun main(): Unit = coroutineScope {
    	val channel = Channel<Int>()
    	launch {
    		repeat(5) { index ->
    			delay(1000)
    			println("Producing next one")
    			channel.send(index * 2)
    		}
    	}
    
    	launch {
    		repeat(5) {
    			val received = channel.receive()
    			println(received)
    		}
    	}
    }
    
  • 코루틴 빌더인 produce 함수를 사용하는 것이 좀 더 편리
    • produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이 채널을 닫습니다.
    • close를 반드시 호출
    • produce는 채널을 안전하고 편리한 방법
    suspend fun main(): Unit = coroutineScope {
    	val channel = Channel<Int>()
    	launch {
    		repeat(5) { index -> 
    			println("Producing next one")
    			delay(1000)
    			channel.send(index * 2)
    		}
    		channel.close()
    	}
    
    	launch {
    		for (element in channel) {
    			println(element)
    		}
    	}
    }
    

채널 타입

  • 설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있음
    • 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 채널
      • send가 중단되지 않음
      suspend fun main(): Unit = coroutinScope {
      	val channel = produce(capacity = Channel.UNLIMITED) {
      		repeat(5) { index -> 
      			send(index * 2)
      			delay(100)
      			println("Sent")
      		}
      	}
      
      	delay(1000)
      	for (element in channel) {
      		println(element)
      		delay(1000)
      	}
      }
      
    • 버퍼(Buffered): 특정 용량 크기 또는 Channel.BUFFERED로 설정된 채널
    • suspend fun main(): Unit = coroutinScope { val channel = produce(capacity = 3) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } Sent Sent Sent Consuming next one : 0 Sent Consuming next one : 2 Sent Consuming next one : 4 Consuming next one : 6 Consuming next one : 8
    • 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDZVOUS인 채널
      • 송신자와 수신자가 만날 때만 원소를 교환
      suspend fun main(): Unit = coroutinScope {
      	val channel = produce(capacity = Channel.RENDEZVOUS) {
      		repeat(5) { index -> 
      			send(index * 2)
      			delay(100)
      			println("Sent")
      		}
      	}
      
      	delay(1000)
      	for (element in channel) {
      		println(element)
      		delay(1000)
      	}
      }
      
    • 융합(Conflated): 버퍼 크기가 1인 채널
      • 새로운 원소가 이전 원소를 대체
      suspend fun main(): Unit = coroutinScope {
      	val channel = produce(capacity = Channel.CONFLATED) {
      		repeat(5) { index -> 
      			send(index * 2)
      			delay(100)
      			println("Sent")
      		}
      	}
      
      	delay(1000)
      	for (element in channel) {
      		println(element)
      		delay(1000)
      	}
      }
      

버퍼 오버플로일 때

  • 채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때 행동을 정의 할 수 있음
    • SUSPEND(기본 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단
    • DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거
    • DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거
  • onBufferOverflow를 DROP_OLDEST로 설정
  • suspend fun main(): Unit = coroutinScope { val channel = Channel<Int> ( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST ) launch { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } channel.close() } delay(1000) for (element in channel) { println(element) delay(1000) } }

전달되지 않은 원소 핸들러

  • Channel 함수에서 반드시 알아야 할 파라미터는 onUndeliveredElement
    • 원소가 어떠한 이유로 처리되지 않았을 때 호출
    • 대부분 채널이 닫히거나 취소되었음을 의미
    val channel = Channel<Resource>(capacity) { resource ->
    	resource.close()
    }
    
    val channel = Channel<Resource>(
    	capacity,
    	onUndeliveredElement = { resource ->
    		resource.close()
    	}
    )
    
    val resourceToSend = openResource()
    channel.send(resourceToSend)
    
    val resourceReceived = channel.receive()
    try {
    	...
    } finally {
    	resourceReceived.close()
    }
    

팬아웃

  • 여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있음
    • 원소를 적절하게 처리하려면 반드시 for 루프 사용
    • consumeEach는 여러 개의 코루틴이 사용하기에는 안전하지 않음
    fun CoroutinScope.produceNumbers() = produce {
    	repeat(10) {
    		delay(100)
    		send(it)
    	}
    }
    
    fun CoroutineScope.launchProcessor(
    	id: Int,
    	channel: ReciveChannel<Int>
    ) = launch {
    	for (msg in channel) {
    		println("$id received $msg")
    	}
    }
    
    suspend fun main(): Unit = coroutineScope {
    	val channel = produceNumbers()
    	repeat(3) { id -> 
    		delay(10)
    		launchProcessor(id, channel)
    	}
    }
    

팬인

  • 여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있음
  • suspend fun sendString( channel: SendChannel<String>, text: String, time: Long ) { while(true) { delay(time) channel.send(text) } } fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(50) { println(channel.receive()) } coroutinContext.cancelChildren() }
  • 다수의 채널을 하나의 채널로 합쳐야 할 경우
    • produce 함수로 여러 개의 채널을 합치는 fanIn 함수 사용
    fun <T> CoroutinScope.fanIn(
    	channels: List<ReceiveChannel<T>>
    ): ReceiveChannel<T> = produce {
    	for (channel in channels) {
    		launch {
    			for (elem in channel) {
    				send(elem)
    			}
    		}
    	}
    }
    

파이프라인

  • 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우
  • fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce { repeat(3) { num -> send(num + 1) } } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce { for (num in numbers) { send(num * num) } } suspend fun main() = coroutineScope { val numbers = numbers() val squared = square(numbers) for (num in squared) { println(num) } } // 1 // 4 // 9

통신의 기본 형태로서의 채널

  • 채널은 서로 다른 코루틴이 통신할 때 유용
  • suspend fun CoroutineScope.serveOrders( orders: receiveChannel<Order>, baristaName: String ): ReceiveChannel<CoffeeResult> = produce { for (order in orders) { val coffee = prepareCoffee(order.type) send(CoffeeResult( coffee = coffee, customer = order.customer, baristaName = baristaName ) } } val coffeResults = fanIn( serveOrders(ordersChannel, "Alex"), serveOrders(ordersChannel, "Bob"), serveOrders(ordersChannel, "Celine") )

실제 사용 예

  • 채널을 사용하는 전형적인 예는 데이터가 한 쪽에서 생성되고 다른 쪽에서 데이터를 처리하는 것
    • 사용자의 클릭에 반응하는 경우
    • 서버로부터 새로운 알림이 오는 경우
    • 시간이 흐르면서 검색 결과를 업데이트하는 경우
  • 대부분의 경우 채널과 플로우가 합쳐진 channelFlow나 callbackFlow를 사용하는 것이 더 좋음
    • 21장 플로우 만들기 참고
  • 순수한 형태의 채널은 좀 더 복잡한 처리를 요하는 경우에 유용
  • 하나의 프로세스로 오랫동안 처리하는 것 좋은 생각이 아님
    • 내부적으로 예외가 발생하거나 서버가 재개 되면 어디서 멈췄는지 알기 어려움
    • 대규모 데이터를 가지고 있는 사용자가 오랫동안 서버를 붙들고 있으면 나머지 사용자는 한참을 기다려야 함
  • 파이프라인을 설정하는 것이 해결책이 될 수 있음
    • 첫 번째 채널은 처리해야 할 사용자를 가지고 있음
    • 두 번째 채널은 갱신해야할 데이터를 가지고 있음
    • 채널 모두 버퍼를 가지고 있기 때문에 버퍼 이상으로 데이터를 받는 것을 방지
    suspend fun handleOfferUpdates() = coroutineScope {
    	val sellerChannel = listenOnSellerChanges()
    	val offerToUpdateChannel = produce(capacity = UNLIMITED) {
    		repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {
    			launch {
    				for (seller in sellerChannel) {
    					val offers = offerService.requestOffers(seller.id)
    					offers.forEach { send(it) }
    				}
    			}
    		}
    	}
    
    	repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {
    		launch {
    			for (offer in offerToUpdateChannel) {
    				sendOfferUpdate(offer)
    			}
    		}
    	}
    }
    

요약

  • 채널은 코루틴끼리 통신할 때 사용하는 강력한 기본 도구
  • 송신자와 수신자의 수에 제한이 없음
  • 채널을 통해 보내진 데이터는 단 한 번 받는 것이 보장
  • produce 빌더를 사용해 채널을 생성
  • 채널은 특정 작업에 사용되는 코루틴의 수를 조절하는 파이프라인을 설정할 때 유용
  • 플로우를 채널과 연결해서 사용하는 경우가 많음
728x90