728x90
코틀린 코루틴을 요약한 내용입니다.
- 플로우는 비동기적으로 계산해야 할 값의 스트림
- 플로우 인터페이스 자체는 떠다니는 원소들을 모으는 역활
- 플로우 끝에 도달할 때까지 각 값을 처리하는 것을 의미
- 플로우의 collect는 컬렉션의 forEach와 비슷
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) }
- 플로우의 유일한 멤버 함수는 collect
- 다른 함수들은 확장 함수로 정의 되어 있음
interface Iterable<out T> {
operator fun iterator(): Iterator<T>
}
interface Sequence<out T> {
operator fun iterator(): Iterator<T>
}
플로우와 값들을 나타내는 다른 방법들의 비교
- 원소들이 채워질 때 까지 모든 값이 생성되기를 기다려야 함
fun getList(): List<Int> = List(3) {
Thread.sleep(1000)
"User$it"
}
fun main() {
val list = getList()
println("Function started")
list.forEach { println(it) }
}
- 원소를 하나씩 계산할 때는, 원소가 나오자마자 바로 얻을 수 있는 것이 좋음
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
yield("User$it")
}
}
fun main() {
val list = getSequence()
println("Function started")
list.forEach { println(it) }
}
- 시퀀스는 CPU 집약적인 연산 또는 블로킹 연산일 때 필요할 때마다 값을 계산하는 플로우를 나타내기에 적절함
- 시퀀스는 최종 연산은 중단 함수가 아님
- 시퀀스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드가 블로킹 됨
- 시퀀스 빌더의 스코프에서는 SequenceScope의 리시버에서 호출되는 함수 외에 다른 중단 함수를 사용할 수 없음
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
delay(1000) // 컴파일 에러
yield("User$it")
}
}
- HTTP의 엔드포인드로부터 시퀀스를 사용해 페이징 기법으로 빈페이지를 받을 때까지 모든 사용자의 리스트를 얻는 경우
// 이렇게 구현하면 안됨, 시퀀스 대시 플로우를 사용해야 함
fun allUsersSequence(
api: UserApi
): Sequence<User> = sequence {
val page = 0
do {
val users = api.takePage(page++)
yieldAll(users)
} while (!users.isNullOrEmpty())
}
- 원소가 무거운 경우, 원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 시퀀스가 좋음
val fibonacci: Sequence<BigInteger> = sequence {
var first = 0.toBigInteger()
var second = 1.toBigInteger()
while (true) {
yield(first)
val temp = first
first += second
second = temp
}
}
fun countCharactersInFile(path: String): Int = File(path).useLines { lines ->
lines.sumBy { it.length }
}
- 시퀀스를 사용했기 때문에 foreEach가 블로킹 연산이 됨
- 같은 스레드에서 launch로 시작된 코루틴이 대기하게 되며, 하나의 코루틴이 다른 코루틴을 블로킹하게 됨
fun getSequence(): Sequence<String> = sequence { repeat(3) { delay(1000) // 컴파일 에러 yield("User$it") } } suspend fun main() { withContext(newSingleThreadContext("main")) { launch { repeat(3) { delay(100) println("Processing on coroutine") } } val list = getSequence() list.forEach { println(it) } } }
- 플로우의 빌더와 연산은 중단 함수이며 구조화된 동시성과 적절한 예외 처리를 지원
fun getFlow(): Flow<String = flow {
repeat(3) {
delay(1000)
emit("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
launch {
repeat(3) {
delay(100)
println("Processing on coroutine")
}
}
val list = getFlow()
list.collect { println(it) }
}
}
- 플로우는 코루틴을 사용해야 하는 데이터 스트림으로 사용되어야 함
fun allUsersFlow(
api: UserApi
): Flow<User> = flow {
var page = 0
do {
val users = api.takePage(page++)
emitAll(users)
} while(!users.isNullOrEmpty())
}
플로우의 특징
- 플로우의 최종 연산은 스레드를 블로킹하는 대신 코루틴을 중단
- 코루틴 컨텍스트를 활용하고 예외를 처리하는 등의 코루틴 기능 제공
- 플로우 처리는 취소 가능하며, 구조화된 동시성을 기본적으로 갖추고 있음
- flow는 빌더는 중단 함수가 아니며 어떠한 스코프도 필요하지 않음
- 플로우의 최종 연산은 중단 가능, 연산이 실행될 때 부모 코루틴의 관계가 정립
- CoroutineName 컨텍스트가 collect에서 flow 빌더의 람다 표현식으로 전달되는 것을 보여줌
fun usersFlow(): Flow<String> = flow {
repeat(3) {
delay(1000)
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
widthContext(CoroutineName("Name")) {
val job = launch {
users.collect { println(it) }
}
launch {
delay(2100)
println("I got enough")
job.cancel()
}
}
}
플로우 명명법
- 모든 플로우는 몇 가지 요소로 구성됨
- 플로우는 어딘가에서 시작되어야 함
- 플로우 빌더, 다른 객체에서의 변환, 또는 헬퍼 함수로부터 시작
- 플로우의 마지막 연산은 최종 연산이라 불림
- 최종 연산은 중단 가능하거나 스코프를 필요로 하는 유일한 연산
- 최종 연산은 주로 람다 표현식을 가진 또는 가지지 않는 collect가 됨
- 시작 연산과 최종 연산 사이에 플로우를 변경하는 중간 연산을 가질 수 있다
실제 사용 예
- 채널보다 플로우가 필요한 경우가 더 많음
- 플로우가 사용되는 전형적인 예
- 웹소켓이나 RSocket 알림과 같이 서버가 보낸 이벤트를 통해 전달된 메세지를 받는 경우
- 텍스트 입력 또는 클릭과 같은 사용자 액션이 감지된 경우
- 센서 또는 위치나 지도와 같은 기기의 정보를 변경을 받는 경우
- 데이터베이스의 변경을 감지하는 경우
- 플로우는 동시성 처리를 위해 유용하게 사용될 수 있음
- 컬렉션 처리 내부에서 async를 사용하면 동시 처리를 할 수 있다.
- suspend fun getOffers( sellers: List<Seller> ): List<Offer> = coroutineScope { sellers .map { seller -> async { api.requestOffers(seller.id) } } .flatMap { it.await() } }
- 많은 요청을 한번에 보내면 우리 서비스뿐 아니라 요청을 받을 서버 둘 모두에게 좋지 않음
- 동시성 호출의 수를 20으로 제한하기 위해 동시성 속성이 20으로 제한됨 flatMapMerege를 사용할 수 있음
- suspend fun getOffers( sellers: List<Seller> ): List<Offer> = sellers .asFlow() .flatMapMerge(concurrency = 20) { seller -> suspend { api.requestOffers(seller.id) }.asFlow() } .toList()
- 컬렉션 대신 플로우로 처리하면 동시 처리, 컨텍스트, 예외를 비롯한 많은 것을 조절할 수 있음
요약
- 플로우는 코루틴을 지원하며 비동기적으로 계산되는 값을 나타냄
728x90
'코틀린 스터디' 카테고리의 다른 글
플로우 만들기 (1) | 2024.02.02 |
---|---|
플로우의 실제 구현 (0) | 2024.02.02 |
핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.01.04 |
셀렉트 (0) | 2024.01.04 |
채널 (0) | 2023.12.04 |