코틀린 스터디

플로우란 무엇인가?

막이86 2024. 1. 4. 11:00
728x90

코틀린 코루틴을 요약한 내용입니다.

  • 플로우는 비동기적으로 계산해야 할 값의 스트림
  • 플로우 인터페이스 자체는 떠다니는 원소들을 모으는 역활
  • 플로우 끝에 도달할 때까지 각 값을 처리하는 것을 의미
    • 플로우의 collect는 컬렉션의 forEach와 비슷
    interface Flow<out T> {
    	suspend fun collect(collector: FlowCollector<T>)
    }
    
  • 플로우의 유일한 멤버 함수는 collect
  • 다른 함수들은 확장 함수로 정의 되어 있음
interface Iterable<out T> {
	operator fun iterator(): Iterator<T>
}

interface Sequence<out T> {
	operator fun iterator(): Iterator<T>
}

플로우와 값들을 나타내는 다른 방법들의 비교

  • 원소들이 채워질 때 까지 모든 값이 생성되기를 기다려야 함
fun getList(): List<Int> = List(3) {
	Thread.sleep(1000)
	"User$it"
}

fun main() {
	val list = getList()
	println("Function started")
	list.forEach { println(it) }
}
  • 원소를 하나씩 계산할 때는, 원소가 나오자마자 바로 얻을 수 있는 것이 좋음
fun getSequence(): Sequence<String> = sequence {
	repeat(3) {
		Thread.sleep(1000)
		yield("User$it")
	}
}

fun main() {
	val list = getSequence()
	println("Function started")
	list.forEach { println(it) }
}
  • 시퀀스는 CPU 집약적인 연산 또는 블로킹 연산일 때 필요할 때마다 값을 계산하는 플로우를 나타내기에 적절함
    • 시퀀스는 최종 연산은 중단 함수가 아님
    • 시퀀스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드가 블로킹 됨
    • 시퀀스 빌더의 스코프에서는 SequenceScope의 리시버에서 호출되는 함수 외에 다른 중단 함수를 사용할 수 없음
fun getSequence(): Sequence<String> = sequence {
	repeat(3) {
		delay(1000)            // 컴파일 에러
		yield("User$it")
	}
}
  • HTTP의 엔드포인드로부터 시퀀스를 사용해 페이징 기법으로 빈페이지를 받을 때까지 모든 사용자의 리스트를 얻는 경우
// 이렇게 구현하면 안됨, 시퀀스 대시 플로우를 사용해야 함
fun allUsersSequence(
	api: UserApi
): Sequence<User> = sequence {
		val page = 0
		do {
			val users = api.takePage(page++)
			yieldAll(users)
		} while (!users.isNullOrEmpty())
}
  • 원소가 무거운 경우, 원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 시퀀스가 좋음
val fibonacci: Sequence<BigInteger> = sequence {
	var first = 0.toBigInteger()
	var second = 1.toBigInteger()

	while (true) {
		yield(first)
		val temp = first
		first += second
		second = temp
	}
}

fun countCharactersInFile(path: String): Int = File(path).useLines { lines ->
	lines.sumBy { it.length }
}
  • 시퀀스를 사용했기 때문에 foreEach가 블로킹 연산이 됨
    • 같은 스레드에서 launch로 시작된 코루틴이 대기하게 되며, 하나의 코루틴이 다른 코루틴을 블로킹하게 됨
    fun getSequence(): Sequence<String> = sequence {
    	repeat(3) {
    		delay(1000)            // 컴파일 에러
    		yield("User$it")
    	}
    }
    
    suspend fun main() {
    	withContext(newSingleThreadContext("main")) {
    		launch {
    			repeat(3) {
    				delay(100)
    				println("Processing on coroutine")
    			}
    		}
    
    		val list = getSequence()
    		list.forEach { println(it) }
    	}
    }
    
  • 플로우의 빌더와 연산은 중단 함수이며 구조화된 동시성과 적절한 예외 처리를 지원
fun getFlow(): Flow<String = flow {
	repeat(3) {
		delay(1000)
		emit("User$it")
	}
}

suspend fun main() {
	withContext(newSingleThreadContext("main")) {
		launch {
			repeat(3) {
				delay(100)
				println("Processing on coroutine")
			}
		}
		
		val list = getFlow()
		list.collect { println(it) }
	}
}
  • 플로우는 코루틴을 사용해야 하는 데이터 스트림으로 사용되어야 함
fun allUsersFlow(
	api: UserApi
): Flow<User> = flow {
	var page = 0
	do {
		val users = api.takePage(page++)
		emitAll(users)
	} while(!users.isNullOrEmpty())
}

플로우의 특징

  • 플로우의 최종 연산은 스레드를 블로킹하는 대신 코루틴을 중단
  • 코루틴 컨텍스트를 활용하고 예외를 처리하는 등의 코루틴 기능 제공
  • 플로우 처리는 취소 가능하며, 구조화된 동시성을 기본적으로 갖추고 있음
  • flow는 빌더는 중단 함수가 아니며 어떠한 스코프도 필요하지 않음
  • 플로우의 최종 연산은 중단 가능, 연산이 실행될 때 부모 코루틴의 관계가 정립
  • CoroutineName 컨텍스트가 collect에서 flow 빌더의 람다 표현식으로 전달되는 것을 보여줌
fun usersFlow(): Flow<String> = flow {
	repeat(3) {
		delay(1000)
		val ctx = currentCoroutineContext()
		val name = ctx[CoroutineName]?.name
		emit("User$it in $name")
	}
}

suspend fun main() {
	val users = usersFlow()
	widthContext(CoroutineName("Name")) {
		val job = launch {
			users.collect { println(it) }
		}
		
		launch {
			delay(2100)
			println("I got enough")
			job.cancel()
		}
	}
}

플로우 명명법

  • 모든 플로우는 몇 가지 요소로 구성됨
  • 플로우는 어딘가에서 시작되어야 함
    • 플로우 빌더, 다른 객체에서의 변환, 또는 헬퍼 함수로부터 시작
  • 플로우의 마지막 연산은 최종 연산이라 불림
    • 최종 연산은 중단 가능하거나 스코프를 필요로 하는 유일한 연산
    • 최종 연산은 주로 람다 표현식을 가진 또는 가지지 않는 collect가 됨
  • 시작 연산과 최종 연산 사이에 플로우를 변경하는 중간 연산을 가질 수 있다

실제 사용 예

  • 채널보다 플로우가 필요한 경우가 더 많음
  • 플로우가 사용되는 전형적인 예
    • 웹소켓이나 RSocket 알림과 같이 서버가 보낸 이벤트를 통해 전달된 메세지를 받는 경우
    • 텍스트 입력 또는 클릭과 같은 사용자 액션이 감지된 경우
    • 센서 또는 위치나 지도와 같은 기기의 정보를 변경을 받는 경우
    • 데이터베이스의 변경을 감지하는 경우
  • 플로우는 동시성 처리를 위해 유용하게 사용될 수 있음
  • 컬렉션 처리 내부에서 async를 사용하면 동시 처리를 할 수 있다.
  • suspend fun getOffers( sellers: List<Seller> ): List<Offer> = coroutineScope { sellers .map { seller -> async { api.requestOffers(seller.id) } } .flatMap { it.await() } }
  • 많은 요청을 한번에 보내면 우리 서비스뿐 아니라 요청을 받을 서버 둘 모두에게 좋지 않음
  • 동시성 호출의 수를 20으로 제한하기 위해 동시성 속성이 20으로 제한됨 flatMapMerege를 사용할 수 있음
  • suspend fun getOffers( sellers: List<Seller> ): List<Offer> = sellers .asFlow() .flatMapMerge(concurrency = 20) { seller -> suspend { api.requestOffers(seller.id) }.asFlow() } .toList()
  • 컬렉션 대신 플로우로 처리하면 동시 처리, 컨텍스트, 예외를 비롯한 많은 것을 조절할 수 있음

요약

  • 플로우는 코루틴을 지원하며 비동기적으로 계산되는 값을 나타냄
728x90

'코틀린 스터디' 카테고리의 다른 글

플로우 만들기  (1) 2024.02.02
플로우의 실제 구현  (0) 2024.02.02
핫 데이터 소스와 콜드 데이터 소스  (0) 2024.01.04
셀렉트  (0) 2024.01.04
채널  (0) 2023.12.04