728x90
코틀린 코루틴을 요약한 내용입니다.
- 플로우는 어떤 연산을 실행할지 정의
- 중단 가능한 람다식에 몇 가지 요소를 추가
Flow 이해하기
- 람다식은 한 번만 정의되고 여러 번 호출 할 수 있음
- fun main() { val f: () => Unit => { print("A") print("B") print("C") } f() f() }
- 람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 후출할 수는 없음
- fun main() { val f: suspend () => Unit => { print("A") delay(1000) print("B") delay(1000) print("C") } f() f() }
- 람다식은 함수를 나타내는 파라미터를 가질 수 있음
- 람다식 f를 호출할 때 emit으로 사용될 또 다른 람다식을 명시
- emit는 중단 함수가 되야함
suspend fun main() { val f: suspend ((String) -> Unit) -> Unit = { emit -> emit("A") emit("B") emit("C") }) f { print(it) } f { print(it) } }
- FlowCollector 함수형 인터페이스를 정의
- 함수형 인터페이스는 람다식으로 정의할 수 있기 때문에 f 호출을 바꿀 필요 없음
import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } suspend fun main() { val f: suspend (FlowCollector) -> Unit = { it.emit("A") it.emit("B") it.emit("C") } f { print(it) } f { print(it) } }
- it에서 emit를 호출하는 것은 불편함으로, FlowCollector를 리시버로 변경
- this.emit 또는 emit만 호출 가능
fun interface FlowCollector { suspend fun emit(value: String) } suspend fun main() { val f: suspend FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } f { print(it) } f { print(it) } }
- 람다식을 전달하는 대신에, 인터페이스를 구현한 객체를 만드는 편이 좋음
- 인터페이스의 정의는 객체 표현식으로 래핑
import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } interface Flow { suspend fun collect(collector: FlowCollector) } suspend fun main() { val builder: suspend FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } val flow: Flow = object : Flow { override suspend fun collect(collector: FlowCollector) { collector.builder() } } flow.collect { print(it) } flow.collect { print(it) } }
- flow 빌더를 정의
- import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } interface Flow { suspend fun collect(collector: FlowCollector) } fun flow( builder: suspend FlowCollector.() -> Unit ) = object : Flow { override suspend fun collect(collector: FlowCollector) { collector.builder() } } suspend fun main() { val f: Flow = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } f.collect { print(it) } }
- String을 제네릭 타입으로 변경
- import kotlin.* fun interface FlowCollector<T> { suspend fun emit(value: T) } interface Flow<T> { suspend fun collect(collector: FlowCollector<T>) } fun <T> flow( builder: suspend FlowCollector<T>.() -> Unit ) = object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { collector.builder() } } suspend fun main() { val f: Flow<String> = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } f.collect { print(it) } }
- 플로우 구현한 방식은 Flow, FlowCollector, flow가 실제 구현된 방식과 거의 동일
Flow 처리 방식
- Flow는 리시버가 있는 중단 람다식에 비해 훨씬 복잡하다고 여겨짐
- Flow의 강력한 점은 Flow를 생성하고, 처리하고, 감지하기 위해 정의한 함수에서 찾을 수 있음
- Flow의 각 원소를 변환하는 map 함수 정의
- 원소를 받을 때마다, map은 원소를 변환하고 새로운 플로우로 방출
fun <T, R> Flow<T>.map( transformation: suspend (T) -> R ): Flow<R> = flow { collect { emit(transformation(it)) } } suspend fun main() { flowOf("A", "B", "C") .map { delay(1000) it.lowercase() } .collect { println(it) } }
- 대부분의 메서드들의 작동하는 방식이 비슷
- fun <T> Flow<T>.filter( predicate: suspend (T) -> Boolean ): Flow<T> = flow { collect { if (predicate(it) { emit(it) } } } fun <T> Flow<T>.onEach( action: suspend (T) -> Unit ): Flow<T> = flow { collect { action(it) emit(it) } } fun <T> Flow<T>.onStart( action: suspend (T) -> Unit ): Flow<T> = flow { action(it) collect { emit(it) } }
동기로 작동하는 Flow
- 플로우 또한 중단 함수처럼 동기로 작동하기 때문에, 플로우가 완료될 때까지 collect 호출이 중단됨
- 플로우는 새로운 코루틴을 시작하지 않음
- 플루우의 각 단계에서 코루틴을 시작할 수 있지만 중단 함수의 기본 동작은 아님
- onEach 내부에 delay가 있으면 모든 원소가 처리되기 전이 아닌 각 원소 사이에 지연이 생김
- suspend fun main() { flowOf("A", "B", "C") .onEach { delay(1000) } .collect { println(it) } }
플로우와 공유 상태
- 플로우 처리를 통해 좀더 복잡한 알고리즘을 구현할 때는 언제 변수에 대한 접근을 동기화 해야하는지 알아보자
- 플로우의 각 단계가 동기로 작동하기 때문에 동기화 없이도 플로우 내부에 변경 가능한 상태를 정의할 수 있음
- fun <T, K> Flow<T>.distinctBy( keySelector: (T) -> K ) = flow { val sentKeys = mutableSetOf<K>() collect { value -> val key = keySelector(value) if (key !in sentKeys) { sentKeys.add(key) emit(value) } } }
- 플루우 단계에서 사용되는 예제
- 일정한 결과 값을 생성
- 카운터 변수가 항상 1000으로 증가
fun Flow<*>.counter() = flow<Int> { var counter = 0 collect { counter++ List(100) { Random.nextLong() }.shuffled().sorted() emit(counter) } } suspend fun main(): Unit = coroutinScope { val f1 = List(1000) { "$it" }.asFlow() val f2 = List(1000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 1000 launch { println(f1.counter().last()) } // 1000 launch { println(f2.last()) } // 1000 launch { println(f2.last()) } // 1000 }
- 플로우 단계 외부의 변수를 추출해서 함수에서 사용하는 것이 흔히 저지르는 실수 중 하나
- 외부 변수는 같은 플로우가 모으는 모든 코루틴이 공유
fun Flow<*>.counter(): Flow<Int> { var counter = 0 return this.map { counter++ List(100) { Random.nextLong() }.shuffled().sorted() counter } } suspend fun main(): Unit = coroutinScope { val f1 = List(1000) { "$it" }.asFlow() val f2 = List(1000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 1000 launch { println(f1.counter().last()) } // 1000 launch { println(f2.last()) } // 2000 보다 작은 값이 출력 launch { println(f2.last()) } // 2000 보다 작은 값이 출력 }
- 같은 변수를 사용하는 중단 함수들에서 동기화가 필요한 것처럼 플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 레벨에서 정의되어 있으면 동기화가 필요
- var counter = 0 fun Flow<*>.counter(): Flow<Int> = this.map { counter++ List(100) { Random.nextLong() }.shuffled().sorted() counter } suspend fun main(): Unit = coroutinScope { val f1 = List(1000) { "$it" }.asFlow() val f2 = List(1000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 4000 보다 작은 값이 출력 launch { println(f1.counter().last()) } // 4000 보다 작은 값이 출력 launch { println(f2.last()) } // 4000 보다 작은 값이 출력 launch { println(f2.last()) } // 4000 보다 작은 값이 출력 }
<aside> ❓ 흠… 잘 이해가?? ㅠㅠ
</aside>
요약
- Flow는 리시버를 가진 중단 람다식보다 좀더 복잡하다고 볼 수 있음
- 플로우의 처리 함수들은 플로우를 새로운 연산으로 데코레이트 함
- 플로우와 플로우의 메서드가 정의된 방식은 간단하고 직관적이기 때문에 가능
728x90
'코틀린 스터디' 카테고리의 다른 글
플로우 생명 주기 함수 (0) | 2024.02.02 |
---|---|
플로우 만들기 (1) | 2024.02.02 |
플로우란 무엇인가? (1) | 2024.01.04 |
핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.01.04 |
셀렉트 (0) | 2024.01.04 |