코틀린 스터디

플로우 만들기

막이86 2024. 2. 2. 11:16
728x90

코틀린 코루틴을 요약한 내용입니다.

원시값을 가지는 플로우

  • 플로우를 만드는 가장 간단한 방법은 플로우가 어떤 값을 가져야하는지 정의하는 flowOf함수를 사용하는 것
  • suspend fun main() { flowOf(1, 2, 3, 4, 5).collect { print(it) } }
  • 값이 없는 경우 emptyFlow() 함수 사용
  • suspend fun main() { emptyFlow<Int>().collect { print(it) } }

컨버터

  • asFlow 함수를 사용해서 iterable, Iterator, Sequence를 Flow로 바꿀 수 있음
  • suspend fun main() { listOf(1, 2, 3, 4, 5) .asFlow() .collect { print(it) } }

함수를 플로우로 바꾸기

  • 플로우는 시간상 지연되는 하나의 값을 나타낼 때 자주 사용
    • 중단 함수를 플로우로 변환하는 것 또한 가능
    • 중단 함수를 플로우로 바꾸고 싶다면, (suspend() → T와 () → T) 함수의 확장 함수인 asFlow를 사용 가능
    suspend fun main() {
    	val function = suspend {
    		delay(1000)
    		"UserName"
    	}
    
    	function.asFlow().collect { println(it) }
    }
    
  • 일반 함수를 변경하려면 함수 참조값이 필요
  • suspend fun getUserName(): String { delay(1000) return "UserName" } suspend fun main() { ::getUserName.asFlow().collect { println(it } }

플로우와 리액티브 스트림

  • 리액티브 스트림을 활용하고 있다면 코드를 별로 바꾸지 않고 플로우를 적용 가능
    • kotlinx-coroutines-reactive 라이브러리의 asFlow 함수 사용
    suspend fun main() = coroutineScope {
    	Flux.range(1, 5).asFlow().collect { print(it) }
    	Flowable.range(1, 5).asFlow().collect { print(it) }
    	Observable.range(1, 5).asFlow().collect { print(it) }
    }
    
  • 역으로 변환하려면 좀더 복잡한 라이브러리르 사용해야 함
    • kotlinx-coroutines-reactor 라이브러리를 사용하면 Flow를 Flux로 변환 가능
    suspend fun main(): Unit = coroutineScope {
    	val flow = flowOf(1, 2, 3, 4, 5)
    	
    	flow.asFlux()
    		.doOnNext { print(it) }
    		.subscribe()
    
    	flow.asFlowable()
    		.subscribe { print(it) }
    
    	flow.asObservable()
    		.subscribe { print(it) }
    }
    

플로우 빌더

  • flow 빌더는 시퀀스를 만드는 sequence 빌더나 채널을 만드는 produce 빌더와 비슷하게 동작
  • 빌더는 flow 함수를 먼저 호출하고, 람다식 내부에서 emit 함수를 사용해 다음 값을 방출
  • Channel 이나 Flow에서 모든 값을 방출하려면 emitAll을 사용 가능
  • fun makeFlow(): Flow<Int> = flow { repeat(3) { num -> delay(1000) emit(num) } } suspend fun main() { makeFlow() .collect { println(it) } }
  • flow 빌더는 네트워크 API에서 페이지별로 요청되어야 하는 사용자 스트림을 만드는 목적으로 사용
  • fun allUsersFlow( api: UserApi ): Flow<User> = flow { var page = 0 do { val users = api.takePage(page++) emitAll(users) } while(!users.isNullOrEmpty()) }

플로우 빌더 이해하기

  • 플로우 빌더는 플로우를 만드는 가장 기본적인 방법
  • public fun <T> flowOf(varage elements: T): Flow<T> = flow { for (element in elements) { emit(element) } }
  • 플로우 빌더는 내부적으로 아주 간단
    • collect 메서드 내부에서 block 함수를 호출하는 Flow 인터페이스를 구현
    fun <T> flow(
    	block: suspend FlowCollector<T>.() -> Unit
    ): Flow<T> = object : Flow<T>() {
    	override suspend fun collect(collector: FlowCollector<T>) {
    		collector.block()
    	}
    }
    
    interface Flow<out T> {
    	suspend fun collect(collector: FlowCollector<T>)
    }
    
    fun interface FlowCollector<in T> {
    	suspend fun emit(value: T)
    }
    
    fun main() = runBlocking {
    	flow {               // 1
    		emit("A")
    		emit("B")
    		emit("C")
    	}.collect { value -> // 2
    		println(value)
    	}
    }
    
  • 플로우 빌더를 호출하면 단지 객체를 만들 뿐
    • collect를 호출하면 collector 인터페이스의 block 함수를 호출
      • block 함수는 예제 코드의 1에서 정의된 람다식
      • 리시버는 2에서 정의된 람다식(collect)
    • collect를 호출하면 1에서 정의된 람다식을 실행 시작
    • emit를 호출했을때 2에서 정의된 람다식 호출

채널플로우(channelFlow)

  • Flow는 콜드 데이터 스트림이므로 필요할 때만 값을 생성
  • Flow 빌더를 사용해 다음 원소를 생성
    • 필요할 때만 다음 페이지를 요청
    data class User(val name: String)
    
    interface UserApi {
    	suspend fun takePage(pageNumber: Int): List<User>
    }
    
    class FakeUserApi: UserApi {
    	private val users = List(20) { User("User$it") }
    	private val pageSize: Int = 3
    
    	override suspend fun takePage(
    		pageNumber: Int
    	): List<User> {
    		delay(1000)
    		return users
    			.drop(pageSize * pageNumber)
    			.take(pageSize)
    	}
    }
    
    fun allUsersFlow(api: UserApi): Flow<User> = flow {
    	var page = 0
    	do {
    		println("Fetching page $page")
    		val users = api.takePage(page++)
    		emitAll(users.asFlow())
    	} while (!users.isNullOrEmty())
    }
    
    suspend fun main() {
    	val api = FakeUserApi()
    	val users = allUsersFlow(api)
    	val user = users.first {
    		println("Checking $it")
    		delay(1000)
    		it.name == "User3"
    	}
    	println(user)
    }
    
  • 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우
    • 채널과 Flow를 합친 형태 → channelFlow
  • channelFlow 함수는 플로우처럼 Flow 인터페이스를 구현하기 때문에 플로우가 가진 특징을 제공
  • channelFlow 빌더는 일반 함수이며 최종 연산으로 시작 됨
  • 한 번 시작되기만 하면 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성한다는 점이 채널과 비슷
    • 원소를 생성하려면 emit 대신 send를 사용
    fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
    	var page = 0
    	do {
    		println("Fetching page $page")
    		val users = api.takePage(page++)
    		users?.forEac { send(it) }	
    		} while (!users.isNullOrEmty())
    }
    
    suspend fun main() {
    	val api = FakeUserApi()
    	val users = allUsersFlow(api)
    	val user = users.first {
    		println("Checking $it")
    		delay(1000)
    		it.name == "User3"
    	}
    	println(user)
    }
    
  • 여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용
  • flow는 코루틴 빌더가 필요로 하는 스코프를 만들지 못하기 때문에 아래 코드를 실행할 수 없음
  • fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow { launch { collect { send(it) } } other.collect { send(it) } } fun <T> contextualFlow(): Flow<T> = channelFlow { launch(Dispatchers.IO) { send(computeIoValue()) } launch(Dispatchers.Default) { send(computeCpuValue()) } }

콜백플로우(callbackFlow)

  • channelFlow와 callbackFlow는 큰 차이가 없었음
    • 1.3.4에서 콜백을 사용할 때 에러에 덜 민감하도록 몇 가지 작은 변화가 있음
    • 가장 큰 차이점은 callbackFlow가 콜백 함수를 래핑하는 방식으로 변경
  • 콜백을 래핑하는 데 유용한 몇 가지 함수
    • awaitClose: 채널이 닫힐 때까지 중단되는 함수
      • 채널이 닫힌 다음에 인자로 들어온 함수가 실행
      • awaitClose는 callbackFlow에서 아주 중요
        • awaitClose가 없다면 콜백을 등록하고 나서 코루틴은 곧바로 끝나게 됨
        • awaitClose를 사용해 코루틴이 종료되는 것을 막을 수 있음
        • 채널이 닫힐 때까지 어떤 방식으로든 간에 원소를 감지
    • trySendBlocking(value): send와 비슷하지만 중단하는 대신 블로킹하여 중단 함수가 아닌 함수에서도 사용할 수 있음
    • close(): 채널을 닫습니다.
    • cancel(throwable): 채널을 종료하고 플로우에 예외를 던짐
  • callbackFlow가 사용되는 전형적인 방법
  • fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { val callback = object: Callback { override fun onNextValue(value: T) { trySendBlocking(value) } override fun onApiError(cause: Throwable) { cancel(CancelationException("API Error", cause)) } override fun onCompleted() = cannel.close() } api.register(callback) awaitClose { api.unregister(callback) } }

요약

  • flow 함수는 가장 간단한 flow 빌더
    • 다음 값을 생성하기 위해 emit 함수를 사용
  • 채널의 특징 일부를 가지고 있는 플루은 channelFlow와 callbackFlow도 있음
728x90

'코틀린 스터디' 카테고리의 다른 글

플로우 생명 주기 함수  (0) 2024.02.02
플로우의 실제 구현  (0) 2024.02.02
플로우란 무엇인가?  (1) 2024.01.04
핫 데이터 소스와 콜드 데이터 소스  (0) 2024.01.04
셀렉트  (0) 2024.01.04