728x90
코틀린 코루틴을 요약한 내용입니다.
원시값을 가지는 플로우
- 플로우를 만드는 가장 간단한 방법은 플로우가 어떤 값을 가져야하는지 정의하는 flowOf함수를 사용하는 것
- suspend fun main() { flowOf(1, 2, 3, 4, 5).collect { print(it) } }
- 값이 없는 경우 emptyFlow() 함수 사용
- suspend fun main() { emptyFlow<Int>().collect { print(it) } }
컨버터
- asFlow 함수를 사용해서 iterable, Iterator, Sequence를 Flow로 바꿀 수 있음
- suspend fun main() { listOf(1, 2, 3, 4, 5) .asFlow() .collect { print(it) } }
함수를 플로우로 바꾸기
- 플로우는 시간상 지연되는 하나의 값을 나타낼 때 자주 사용
- 중단 함수를 플로우로 변환하는 것 또한 가능
- 중단 함수를 플로우로 바꾸고 싶다면, (suspend() → T와 () → T) 함수의 확장 함수인 asFlow를 사용 가능
suspend fun main() { val function = suspend { delay(1000) "UserName" } function.asFlow().collect { println(it) } }
- 일반 함수를 변경하려면 함수 참조값이 필요
- suspend fun getUserName(): String { delay(1000) return "UserName" } suspend fun main() { ::getUserName.asFlow().collect { println(it } }
플로우와 리액티브 스트림
- 리액티브 스트림을 활용하고 있다면 코드를 별로 바꾸지 않고 플로우를 적용 가능
- kotlinx-coroutines-reactive 라이브러리의 asFlow 함수 사용
suspend fun main() = coroutineScope { Flux.range(1, 5).asFlow().collect { print(it) } Flowable.range(1, 5).asFlow().collect { print(it) } Observable.range(1, 5).asFlow().collect { print(it) } }
- 역으로 변환하려면 좀더 복잡한 라이브러리르 사용해야 함
- kotlinx-coroutines-reactor 라이브러리를 사용하면 Flow를 Flux로 변환 가능
suspend fun main(): Unit = coroutineScope { val flow = flowOf(1, 2, 3, 4, 5) flow.asFlux() .doOnNext { print(it) } .subscribe() flow.asFlowable() .subscribe { print(it) } flow.asObservable() .subscribe { print(it) } }
플로우 빌더
- flow 빌더는 시퀀스를 만드는 sequence 빌더나 채널을 만드는 produce 빌더와 비슷하게 동작
- 빌더는 flow 함수를 먼저 호출하고, 람다식 내부에서 emit 함수를 사용해 다음 값을 방출
- Channel 이나 Flow에서 모든 값을 방출하려면 emitAll을 사용 가능
- fun makeFlow(): Flow<Int> = flow { repeat(3) { num -> delay(1000) emit(num) } } suspend fun main() { makeFlow() .collect { println(it) } }
- flow 빌더는 네트워크 API에서 페이지별로 요청되어야 하는 사용자 스트림을 만드는 목적으로 사용
- fun allUsersFlow( api: UserApi ): Flow<User> = flow { var page = 0 do { val users = api.takePage(page++) emitAll(users) } while(!users.isNullOrEmpty()) }
플로우 빌더 이해하기
- 플로우 빌더는 플로우를 만드는 가장 기본적인 방법
- public fun <T> flowOf(varage elements: T): Flow<T> = flow { for (element in elements) { emit(element) } }
- 플로우 빌더는 내부적으로 아주 간단
- collect 메서드 내부에서 block 함수를 호출하는 Flow 인터페이스를 구현
fun <T> flow( block: suspend FlowCollector<T>.() -> Unit ): Flow<T> = object : Flow<T>() { override suspend fun collect(collector: FlowCollector<T>) { collector.block() } } interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } fun interface FlowCollector<in T> { suspend fun emit(value: T) } fun main() = runBlocking { flow { // 1 emit("A") emit("B") emit("C") }.collect { value -> // 2 println(value) } }
- 플로우 빌더를 호출하면 단지 객체를 만들 뿐
- collect를 호출하면 collector 인터페이스의 block 함수를 호출
- block 함수는 예제 코드의 1에서 정의된 람다식
- 리시버는 2에서 정의된 람다식(collect)
- collect를 호출하면 1에서 정의된 람다식을 실행 시작
- emit를 호출했을때 2에서 정의된 람다식 호출
- collect를 호출하면 collector 인터페이스의 block 함수를 호출
채널플로우(channelFlow)
- Flow는 콜드 데이터 스트림이므로 필요할 때만 값을 생성
- Flow 빌더를 사용해 다음 원소를 생성
- 필요할 때만 다음 페이지를 요청
data class User(val name: String) interface UserApi { suspend fun takePage(pageNumber: Int): List<User> } class FakeUserApi: UserApi { private val users = List(20) { User("User$it") } private val pageSize: Int = 3 override suspend fun takePage( pageNumber: Int ): List<User> { delay(1000) return users .drop(pageSize * pageNumber) .take(pageSize) } } fun allUsersFlow(api: UserApi): Flow<User> = flow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) emitAll(users.asFlow()) } while (!users.isNullOrEmty()) } suspend fun main() { val api = FakeUserApi() val users = allUsersFlow(api) val user = users.first { println("Checking $it") delay(1000) it.name == "User3" } println(user) }
- 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우
- 채널과 Flow를 합친 형태 → channelFlow
- channelFlow 함수는 플로우처럼 Flow 인터페이스를 구현하기 때문에 플로우가 가진 특징을 제공
- channelFlow 빌더는 일반 함수이며 최종 연산으로 시작 됨
- 한 번 시작되기만 하면 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성한다는 점이 채널과 비슷
- 원소를 생성하려면 emit 대신 send를 사용
fun allUsersFlow(api: UserApi): Flow<User> = channelFlow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) users?.forEac { send(it) } } while (!users.isNullOrEmty()) } suspend fun main() { val api = FakeUserApi() val users = allUsersFlow(api) val user = users.first { println("Checking $it") delay(1000) it.name == "User3" } println(user) }
- 여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용
- flow는 코루틴 빌더가 필요로 하는 스코프를 만들지 못하기 때문에 아래 코드를 실행할 수 없음
- fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow { launch { collect { send(it) } } other.collect { send(it) } } fun <T> contextualFlow(): Flow<T> = channelFlow { launch(Dispatchers.IO) { send(computeIoValue()) } launch(Dispatchers.Default) { send(computeCpuValue()) } }
콜백플로우(callbackFlow)
- channelFlow와 callbackFlow는 큰 차이가 없었음
- 1.3.4에서 콜백을 사용할 때 에러에 덜 민감하도록 몇 가지 작은 변화가 있음
- 가장 큰 차이점은 callbackFlow가 콜백 함수를 래핑하는 방식으로 변경
- 콜백을 래핑하는 데 유용한 몇 가지 함수
- awaitClose: 채널이 닫힐 때까지 중단되는 함수
- 채널이 닫힌 다음에 인자로 들어온 함수가 실행
- awaitClose는 callbackFlow에서 아주 중요
- awaitClose가 없다면 콜백을 등록하고 나서 코루틴은 곧바로 끝나게 됨
- awaitClose를 사용해 코루틴이 종료되는 것을 막을 수 있음
- 채널이 닫힐 때까지 어떤 방식으로든 간에 원소를 감지
- trySendBlocking(value): send와 비슷하지만 중단하는 대신 블로킹하여 중단 함수가 아닌 함수에서도 사용할 수 있음
- close(): 채널을 닫습니다.
- cancel(throwable): 채널을 종료하고 플로우에 예외를 던짐
- awaitClose: 채널이 닫힐 때까지 중단되는 함수
- callbackFlow가 사용되는 전형적인 방법
- fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { val callback = object: Callback { override fun onNextValue(value: T) { trySendBlocking(value) } override fun onApiError(cause: Throwable) { cancel(CancelationException("API Error", cause)) } override fun onCompleted() = cannel.close() } api.register(callback) awaitClose { api.unregister(callback) } }
요약
- flow 함수는 가장 간단한 flow 빌더
- 다음 값을 생성하기 위해 emit 함수를 사용
- 채널의 특징 일부를 가지고 있는 플루은 channelFlow와 callbackFlow도 있음
728x90
'코틀린 스터디' 카테고리의 다른 글
플로우 생명 주기 함수 (0) | 2024.02.02 |
---|---|
플로우의 실제 구현 (0) | 2024.02.02 |
플로우란 무엇인가? (1) | 2024.01.04 |
핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.01.04 |
셀렉트 (0) | 2024.01.04 |