AsyncSequence๋ฅผ ๊ณต๋ถ€ํ•˜๋‹ค ํ›„๋ฐ˜์— ๋‚˜์˜จ AsyncStream์„ ์ดํ•ดํ•˜์ง€ ๋ชปํ•ด ๋ฌธ์„œ๋ฅผ ์ฝ์–ด๋ณธ๋‹ค.

AsyncStream

์ด์ „ ๊ธ€์—์„œ AsyncSequence๋ฅผ Customํ•˜๊ฒŒ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•๊นŒ์ง€ ์•Œ์•„๋ณด์•˜๋‹ค. ํ•˜์ง€๋งŒ Apple์—์„œ๋Š” ๊ธฐ์กด ๋กœ์ง์„ convertingํ•˜๋Š”๋ฐ ์žˆ์–ด ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์„ ์ œ์•ˆํ•˜๋Š”๋ฐ, ๊ทธ๋…€์„์ด AsyncStream์ด๋‹ค.

public struct AsyncStream<Element> {
    ...
}
extension AsyncStream : AsyncSequence {
    ...
}

์ผ๋‹จ ์šฐ๋ฆฌ๊ฐ€ ์ด์ „์— ์•Œ์•„๋ณธ AsyncStream์„ ์ค€์ˆ˜ํ•˜๊ณ  ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด, ์ด ์นœ๊ตฌ๋Š” AsyncSequence๋ฅผ ๋งŒ๋“ค์–ด์ฃผ๋ฉด์„œ ๋‹ค๋ฅธ ์ถ”๊ฐ€์ ์ธ method๊ฐ€ ๋“ค์–ด๊ฐ„ ๋…€์„์ด ์•„๋‹๊นŒ? ๋งž๋‹ค.

Example

์ดํ•ดํ•˜๊ธฐ ์œ„ํ•ด Zedd๋‹˜์˜ ์˜ˆ์‹œ ์ฝ”๋“œ๋ฅผ ๊ฐ€์ ธ์™”์Šต๋‹ˆ๋‹ค.

let digits = AsyncStream<Int> { continuation in
    for digit in 1...10 {
        continuation.yield(digit)
    }
    continuation.finish()
}
Task {
    for await digit in digits {
        print(digit) // 1, 2... 10
    }
}

์ด๋ ‡๊ฒŒ ํ•˜๋ฉด AsyncStream์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. ์ด์ „์— ๋น„ํ•ด ์ƒ๋Œ€์ ์œผ๋กœ ์‰ฝ๊ฒŒ AsyncStream์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๋‚ด๋ถ€์ ์œผ๋กœ ๋™์ž‘ํ•˜๋Š” ๋ฐฉ์‹์„ ๋ชจ๋ฅด๊ฒ ๋‹ค.

AsyncStream.init

์ผ๋‹จ AsyncStream์˜ ์ƒ์„ฑ์ž๊ฐ€ ์ด์ƒํ•˜๋‹ค. ์ƒ์„ฑ์‹œ์— closure๋ฅผ ๋ฐ›๊ณ  ์žˆ๋‹ค.

init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncStream<Element>.Continuation) -> Void
)

์‹ค์ œ๋กœ ๋ณด๋ฉด, ๊ธฐ๋ณธ ์ธ์ž๋ฅผ ๋‘๊ฐœ ๋ฐ›๊ณ , ๋งˆ์ง€๋ง‰์— build๋ผ๋Š” ๋ณ€์ˆ˜๋กœ closure๋ฅผ ๋ฐ›๊ณ  ์žˆ๋‹ค.

  1. elementType
    • AsyncStream์ด ์ƒ์‚ฐํ•˜๋Š” Type์„ ์ •์˜ํ•œ๋‹ค.
  2. bufferingPolicy
    • concurrentํ•˜๊ฒŒ ์ƒ์‚ฐํ•œ ์•„์ด๋“ค์„ ๋ชจ์•„๋‘˜ ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋ฅผ ์ •ํ•œ๋‹ค.
  3. build
    • ์–ด๋–ค ๋ฐฉ์‹์œผ๋กœ ์ˆ˜ํ™•(yield)ํ• ์ง€์— ๋Œ€ํ•œ ๋ฐฉ๋ฒ•์„ ์ •์˜ํ•œ๋‹ค. ํ•ด๋‹น ํด๋กœ์ €๋Š” continuation ์ธ์Šคํ„ด์Šค๋ฅผ ๋ฐ›๋Š”๋ฐ, ์ด๊ฑธ๋กœ ์ŠคํŠธ๋ฆผ์— ์ฃผ์ž…ํ•˜๊ณ  ์ข…๋ฃŒํ•  ์ˆ˜ ์žˆ๋‹ค.

buffer? ์–ด๋–ป๊ฒŒ ๋ฒ„ํผ๋กœ ๋™์ž‘ํ•˜๋Š”์ง€ ๊ถ๊ธˆํ•˜์—ฌ ์•„๋ž˜์˜ ์ฝ”๋“œ๋ฅผ ๊ตฌ์„ฑํ–ˆ๋‹ค.

let stream = AsyncStream<Int>(Int.self,
                              bufferingPolicy: .bufferingNewest(5)) { continuation in
   for i in 0..<10 {
        continuation.yield(i)
    }
    continuation.finish()
}
 
Task {
    // Call point:
    for await random in stream {
        print ("\(random)") // 10๊ฐœ์ค‘ ํ˜„์žฌ์‹œ์ ์œผ๋กœ ๋ถ€ํ„ฐ ๊ฐ€์žฅ ๊ทผ๋ฐฉ์— ๋ฐœ์ƒํ•œ 5๊ฐœ์˜ ์›์†Œ๋งŒ ๊ฐ€์ ธ์˜จ๋‹ค. (5, 6, 7, 8, 9)
    }
}

ํ•ด๋‹น ๋™์ž‘ ๋ฐฉ์‹์„ ํ†ตํ•ด ์–ด๋Š์ •๋„ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๋Š”์ง€ ์œ ์ถ”ํ•ด๋ณผ ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค. build๋กœ ๋„ฃ์€ ํด๋กœ์ €๋Š” ๋‹ค๋ฅธ thread์—์„œ ๋™์ž‘ํ•˜๊ฒŒ ํ•˜๊ณ , ๊ฒฐ๊ณผ๋ฅผ ํŠน์ • buffer์— ๋ฐ›์€ ๋’ค,

AsyncStream.Continuation

A mechanism to interface between synchronous code and an asynchronous stream.

  • Producing Element
    • yield method๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ƒ์„ฑํ•œ type์˜ instance๋ฅผ stream์— ์ œ๊ณตํ•˜๋ผ๊ณ  ๋ฌธ์„œ์— ๋‚˜์™€์žˆ๋‹ค.
  • Finishing the Stream
    • stream ์ฃผ์ž…์ด ๋ชจ๋‘ ๋๋‚˜๋ฉด, finish method๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ stream์„ ์ข…๋ฃŒํ•˜๋ผ๊ณ  ํ•œ๋‹ค.

AsyncThrowingStream

AsyncStream์ด AsyncSequence๋ฅผ ์‰ฝ๊ฒŒ ๋งŒ๋“ค์–ด์ฃผ๋Š” ์นœ๊ตฌ๋ผ๋ฉด, ์ด์ „ ๊ธ€์—์„œ ๋ณด์•˜๋“ฏ throw๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋…€์„๋„ ์‰ฝ๊ฒŒ ๋งŒ๋“ค ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค. ์ด๋ฅผ ์‰ฝ๊ฒŒ ๋งŒ๋“ค์–ด์ฃผ๋Š” ์นœ๊ตฌ๊ฐ€ ์š”๋…€์„์ด๋‹ค.

enum CustomError: Error {
    case fiveError
}
 
let digits = AsyncThrowingStream<Int, Error> { continuation in // AsyncThrowingStream.Continuation โœ… AsyncStream.Continuation โŽ
    for _ in 1...10 {
        let digit = Int.random(in: 1...10)
        if digit == 5 {
            continuation.finish(throwing: CustomError.fiveError)
        }
        continuation.yield(digit)
    }
    continuation.finish()
}
 
Task {
    do {
        for try await digit in digits {
            print ("\(digit)")
        }
    } catch {
        print(error)
    }
}
 
// 10
// 7
// 9
// 7
// fiveError

onTermination

๋‘ ํƒ€์ž… ๋ชจ๋‘, stream์ด termination ๋˜๋Š” ์‹œ์ ์— continuation์— onTermination ์ฝœ๋ฐฑ์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. (AsyncStream.Continuation.Termination, AsyncThrowingStream.Continuation.Termination)

let digits = AsyncStream(Int.self) { continuation in
    continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("producing finished")
        case .cancelled:
            print("producing cancelled")
        @unknown default:
            fatalError()
        }
    }
 
    for _ in 1...10 {
        let digit = Int.random(in: 1...10)
        if digit == 5 {
            continuation.onTermination?(.cancelled)
        }
        print("produced: ", digit)
        continuation.yield(digit)
    }
    print("before producing finish")
    continuation.finish()
    print("after producing finish")
}
 
Task {
    do {
        for try await digit in digits {
            print ("stream: \(digit)")
        }
    } catch {
        print(error)
    }
}
 
produced:  8
producing cancelled
producing finished
produced:  5
produced:  9
produced:  5
produced:  3
produced:  7
produced:  9
produced:  6
produced:  7
produced:  4
before producing finish
after producing finish
stream: 8

์œ„์˜ ๋™์ž‘์„ ํ–ˆ์„ ๋•Œ, onTermination์„ ํ˜ธ์ถœํ•˜๋ฉด ๋™์ž‘์ด ๋ฉˆ์ถ”๋Š” ๊ฒƒ์œผ๋กœ ์ดํ•ดํ–ˆ๋Š”๋ฐ ๊ทธ๋Ÿฌ์ง€๋Š” ์•Š๋Š” ๋“ฏ ํ•˜๋‹ค.

This means that you can perform needed cleanup in the cancellation handler.

๊ณต์‹ ๋ฌธ์„œ์— ๋‚˜์˜จ ํ‘œํ˜„์œผ๋กœ ๋ฏธ๋ฃจ์–ด ์ง์ž‘ํ•ด๋ณด์•˜์„ ๋•Œ, ํ•ด๋‹น ์ŠคํŠธ๋ฆผ์ด finish ๋˜๊ฑฐ๋‚˜ cancel๋  ๋•Œ ๋ฐœ์ƒํ•˜๋Š” ๋™์ž‘์„ ๊ทœ์ •ํ•˜๋Š” ๋“ฏ ํ•˜๋‹ค. ๋งŒ์•ฝ digit์ด 5์ธ ๊ฒฝ์šฐ ๋™์ž‘ ์ˆ˜ํ–‰์„ ์ข…๋ฃŒํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, break ๋ฌธ๋“ฑ์„ ์ถ”๊ฐ€ํ•˜์—ฌ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด ๋งž๋Š” ๋“ฏํ•˜๋‹ค.

์ถ”ํ›„์— ๊ณต๋ถ€ํ•˜๋ฉด์„œ ์•Œ๊ฒŒ ๋œ ๊ฒƒ์ธ๋ฐ, ์—ฌ๊ธฐ์„œ onTerminate์˜ cancel์€ structured concurrency์™€ ๊ด€๋ จ์ด ์žˆ๋Š” ๋“ฏ ํ•˜๋‹ค. ์ถ”ํ›„ ๊ธ€์—์„œ ์•Œ์•„๋ณด๋„๋ก ํ•˜์ž.

Reference