코틀린 스터디

플로우의 실제 구현

막이86 2024. 2. 2. 11:15
728x90

코틀린 코루틴을 요약한 내용입니다.

  • 플로우는 어떤 연산을 실행할지 정의
  • 중단 가능한 람다식에 몇 가지 요소를 추가

Flow 이해하기

  • 람다식은 한 번만 정의되고 여러 번 호출 할 수 있음
  • fun main() { val f: () => Unit => { print("A") print("B") print("C") } f() f() }
  • 람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 후출할 수는 없음
  • fun main() { val f: suspend () => Unit => { print("A") delay(1000) print("B") delay(1000) print("C") } f() f() }
  • 람다식은 함수를 나타내는 파라미터를 가질 수 있음
    • 람다식 f를 호출할 때 emit으로 사용될 또 다른 람다식을 명시
    • emit는 중단 함수가 되야함
    suspend fun main() {
    	val f: suspend ((String) -> Unit) -> Unit = { emit ->
    		emit("A")
    		emit("B")
    		emit("C")
    	})
    
    	f { print(it) }
    	f { print(it) }
    }
    
  • FlowCollector 함수형 인터페이스를 정의
    • 함수형 인터페이스는 람다식으로 정의할 수 있기 때문에 f 호출을 바꿀 필요 없음
    import kotlin.*
    
    fun interface FlowCollector {
    	suspend fun emit(value: String)
    }
    
    suspend fun main() {
    	val f: suspend (FlowCollector) -> Unit = {
    		it.emit("A")
    		it.emit("B")
    		it.emit("C")
    	}
    	f { print(it) }
    	f { print(it) }
    }
    
  • it에서 emit를 호출하는 것은 불편함으로, FlowCollector를 리시버로 변경
    • this.emit 또는 emit만 호출 가능
    fun interface FlowCollector {
    	suspend fun emit(value: String)
    }
    
    suspend fun main() {
    	val f: suspend FlowCollector.() -> Unit = {
    		emit("A")
    		emit("B")
    		emit("C")
    	}
    	f { print(it) }
    	f { print(it) }
    }
    
  • 람다식을 전달하는 대신에, 인터페이스를 구현한 객체를 만드는 편이 좋음
    • 인터페이스의 정의는 객체 표현식으로 래핑
    import kotlin.*
    
    fun interface FlowCollector {
    	suspend fun emit(value: String)
    }
    
    interface Flow {
    	suspend fun collect(collector: FlowCollector)
    }
    
    suspend fun main() {
    	val builder: suspend FlowCollector.() -> Unit = {
    		emit("A")
    		emit("B")
    		emit("C")
    	}
    
    	val flow: Flow = object : Flow {
    		override suspend fun collect(collector: FlowCollector) {
    			collector.builder()
    		}
    	}
    	
    	flow.collect { print(it) }
    	flow.collect { print(it) }
    }
    
  • flow 빌더를 정의
  • import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } interface Flow { suspend fun collect(collector: FlowCollector) } fun flow( builder: suspend FlowCollector.() -> Unit ) = object : Flow { override suspend fun collect(collector: FlowCollector) { collector.builder() } } suspend fun main() { val f: Flow = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } f.collect { print(it) } }
  • String을 제네릭 타입으로 변경
  • import kotlin.* fun interface FlowCollector<T> { suspend fun emit(value: T) } interface Flow<T> { suspend fun collect(collector: FlowCollector<T>) } fun <T> flow( builder: suspend FlowCollector<T>.() -> Unit ) = object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { collector.builder() } } suspend fun main() { val f: Flow<String> = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } f.collect { print(it) } }
  • 플로우 구현한 방식은 Flow, FlowCollector, flow가 실제 구현된 방식과 거의 동일

Flow 처리 방식

  • Flow는 리시버가 있는 중단 람다식에 비해 훨씬 복잡하다고 여겨짐
  • Flow의 강력한 점은 Flow를 생성하고, 처리하고, 감지하기 위해 정의한 함수에서 찾을 수 있음
  • Flow의 각 원소를 변환하는 map 함수 정의
    • 원소를 받을 때마다, map은 원소를 변환하고 새로운 플로우로 방출
    fun <T, R> Flow<T>.map(
    	transformation: suspend (T) -> R
    ): Flow<R> = flow {
    	collect {
    		emit(transformation(it))
    	}
    }
    
    suspend fun main() {
    	flowOf("A", "B", "C")
    		.map {
    			delay(1000)
    			it.lowercase()
    		}
    		.collect { println(it) }
    }
    
  • 대부분의 메서드들의 작동하는 방식이 비슷
  • fun <T> Flow<T>.filter( predicate: suspend (T) -> Boolean ): Flow<T> = flow { collect { if (predicate(it) { emit(it) } } } fun <T> Flow<T>.onEach( action: suspend (T) -> Unit ): Flow<T> = flow { collect { action(it) emit(it) } } fun <T> Flow<T>.onStart( action: suspend (T) -> Unit ): Flow<T> = flow { action(it) collect { emit(it) } }

동기로 작동하는 Flow

  • 플로우 또한 중단 함수처럼 동기로 작동하기 때문에, 플로우가 완료될 때까지 collect 호출이 중단됨
  • 플로우는 새로운 코루틴을 시작하지 않음
  • 플루우의 각 단계에서 코루틴을 시작할 수 있지만 중단 함수의 기본 동작은 아님
  • onEach 내부에 delay가 있으면 모든 원소가 처리되기 전이 아닌 각 원소 사이에 지연이 생김
  • suspend fun main() { flowOf("A", "B", "C") .onEach { delay(1000) } .collect { println(it) } }

플로우와 공유 상태

  • 플로우 처리를 통해 좀더 복잡한 알고리즘을 구현할 때는 언제 변수에 대한 접근을 동기화 해야하는지 알아보자
  • 플로우의 각 단계가 동기로 작동하기 때문에 동기화 없이도 플로우 내부에 변경 가능한 상태를 정의할 수 있음
  • fun <T, K> Flow<T>.distinctBy( keySelector: (T) -> K ) = flow { val sentKeys = mutableSetOf<K>() collect { value -> val key = keySelector(value) if (key !in sentKeys) { sentKeys.add(key) emit(value) } } }
  • 플루우 단계에서 사용되는 예제
    • 일정한 결과 값을 생성
    • 카운터 변수가 항상 1000으로 증가
    fun Flow<*>.counter() = flow<Int> {
    	var counter = 0
    	collect {
    		counter++
    		List(100) { Random.nextLong() }.shuffled().sorted()
    		emit(counter)
    	}
    }
    
    suspend fun main(): Unit = coroutinScope {
    	val f1 = List(1000) { "$it" }.asFlow()
    	val f2 = List(1000) { "$it" }.asFlow()
    		.counter()
    	
    	launch { println(f1.counter().last()) } // 1000
    	launch { println(f1.counter().last()) } // 1000
    	launch { println(f2.last()) } // 1000
    	launch { println(f2.last()) } // 1000
    }
    
  • 플로우 단계 외부의 변수를 추출해서 함수에서 사용하는 것이 흔히 저지르는 실수 중 하나
    • 외부 변수는 같은 플로우가 모으는 모든 코루틴이 공유
    fun Flow<*>.counter(): Flow<Int> {
    	var counter = 0
    	return this.map {
    		counter++
    		List(100) { Random.nextLong() }.shuffled().sorted()
    		counter
    	}
    }
    
    suspend fun main(): Unit = coroutinScope {
    	val f1 = List(1000) { "$it" }.asFlow()
    	val f2 = List(1000) { "$it" }.asFlow()
    		.counter()
    	
    	launch { println(f1.counter().last()) } // 1000
    	launch { println(f1.counter().last()) } // 1000
    	launch { println(f2.last()) } // 2000 보다 작은 값이 출력 
    	launch { println(f2.last()) } // 2000 보다 작은 값이 출력 
    }
    
  • 같은 변수를 사용하는 중단 함수들에서 동기화가 필요한 것처럼 플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 레벨에서 정의되어 있으면 동기화가 필요
  • var counter = 0 fun Flow<*>.counter(): Flow<Int> = this.map { counter++ List(100) { Random.nextLong() }.shuffled().sorted() counter } suspend fun main(): Unit = coroutinScope { val f1 = List(1000) { "$it" }.asFlow() val f2 = List(1000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 4000 보다 작은 값이 출력 launch { println(f1.counter().last()) } // 4000 보다 작은 값이 출력 launch { println(f2.last()) } // 4000 보다 작은 값이 출력 launch { println(f2.last()) } // 4000 보다 작은 값이 출력 }

<aside> ❓ 흠… 잘 이해가?? ㅠㅠ

</aside>

요약

  • Flow는 리시버를 가진 중단 람다식보다 좀더 복잡하다고 볼 수 있음
  • 플로우의 처리 함수들은 플로우를 새로운 연산으로 데코레이트 함
  • 플로우와 플로우의 메서드가 정의된 방식은 간단하고 직관적이기 때문에 가능
728x90

'코틀린 스터디' 카테고리의 다른 글

플로우 생명 주기 함수  (0) 2024.02.02
플로우 만들기  (1) 2024.02.02
플로우란 무엇인가?  (1) 2024.01.04
핫 데이터 소스와 콜드 데이터 소스  (0) 2024.01.04
셀렉트  (0) 2024.01.04