AsyncSequence๋ฅผ ๊ณต๋ถํ๋ค ํ๋ฐ์ ๋์จ AsyncStream์ ์ดํดํ์ง ๋ชปํด ๋ฌธ์๋ฅผ ์ฝ์ด๋ณธ๋ค.
AsyncStream
์ด์ ๊ธ์์ AsyncSequence๋ฅผ Customํ๊ฒ ๋ง๋๋ ๋ฐฉ๋ฒ๊น์ง ์์๋ณด์๋ค. ํ์ง๋ง Apple์์๋ ๊ธฐ์กด ๋ก์ง์ convertingํ๋๋ฐ ์์ด ๋ค๋ฅธ ๋ฐฉ๋ฒ์ ์ ์ํ๋๋ฐ, ๊ทธ๋
์์ด AsyncStream
์ด๋ค.
public struct AsyncStream<Element> {
...
}
extension AsyncStream : AsyncSequence {
...
}
์ผ๋จ ์ฐ๋ฆฌ๊ฐ ์ด์ ์ ์์๋ณธ AsyncStream
์ ์ค์ํ๊ณ ์๋ค๋ ๊ฒ์ ํ์ธํ ์ ์๋ค. ๊ทธ๋ ๋ค๋ฉด, ์ด ์น๊ตฌ๋ AsyncSequence๋ฅผ ๋ง๋ค์ด์ฃผ๋ฉด์ ๋ค๋ฅธ ์ถ๊ฐ์ ์ธ method๊ฐ ๋ค์ด๊ฐ ๋
์์ด ์๋๊น? ๋ง๋ค.
Example
์ดํดํ๊ธฐ ์ํด Zedd๋์ ์์ ์ฝ๋๋ฅผ ๊ฐ์ ธ์์ต๋๋ค.
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
Task {
for await digit in digits {
print(digit) // 1, 2... 10
}
}
์ด๋ ๊ฒ ํ๋ฉด AsyncStream์ ๋ง๋ค ์ ์๋ค. ์ด์ ์ ๋นํด ์๋์ ์ผ๋ก ์ฝ๊ฒ AsyncStream์ ๋ง๋ค ์ ์๋ค. ๊ทธ๋ฐ๋ฐ ๋ด๋ถ์ ์ผ๋ก ๋์ํ๋ ๋ฐฉ์์ ๋ชจ๋ฅด๊ฒ ๋ค.
AsyncStream.init
์ผ๋จ AsyncStream์ ์์ฑ์๊ฐ ์ด์ํ๋ค. ์์ฑ์์ closure๋ฅผ ๋ฐ๊ณ ์๋ค.
init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncStream<Element>.Continuation) -> Void
)
์ค์ ๋ก ๋ณด๋ฉด, ๊ธฐ๋ณธ ์ธ์๋ฅผ ๋๊ฐ ๋ฐ๊ณ , ๋ง์ง๋ง์ build๋ผ๋ ๋ณ์๋ก closure๋ฅผ ๋ฐ๊ณ ์๋ค.
elementType
- AsyncStream์ด ์์ฐํ๋ Type์ ์ ์ํ๋ค.
bufferingPolicy
- concurrentํ๊ฒ ์์ฐํ ์์ด๋ค์ ๋ชจ์๋ ๋ฒํผ์ ํฌ๊ธฐ๋ฅผ ์ ํ๋ค.
build
- ์ด๋ค ๋ฐฉ์์ผ๋ก ์ํ(yield)ํ ์ง์ ๋ํ ๋ฐฉ๋ฒ์ ์ ์ํ๋ค. ํด๋น ํด๋ก์ ๋ continuation ์ธ์คํด์ค๋ฅผ ๋ฐ๋๋ฐ, ์ด๊ฑธ๋ก ์คํธ๋ฆผ์ ์ฃผ์ ํ๊ณ ์ข ๋ฃํ ์ ์๋ค.
buffer? ์ด๋ป๊ฒ ๋ฒํผ๋ก ๋์ํ๋์ง ๊ถ๊ธํ์ฌ ์๋์ ์ฝ๋๋ฅผ ๊ตฌ์ฑํ๋ค.
let stream = AsyncStream<Int>(Int.self,
bufferingPolicy: .bufferingNewest(5)) { continuation in
for i in 0..<10 {
continuation.yield(i)
}
continuation.finish()
}
Task {
// Call point:
for await random in stream {
print ("\(random)") // 10๊ฐ์ค ํ์ฌ์์ ์ผ๋ก ๋ถํฐ ๊ฐ์ฅ ๊ทผ๋ฐฉ์ ๋ฐ์ํ 5๊ฐ์ ์์๋ง ๊ฐ์ ธ์จ๋ค. (5, 6, 7, 8, 9)
}
}
ํด๋น ๋์ ๋ฐฉ์์ ํตํด ์ด๋์ ๋ ์ด๋ป๊ฒ ๋์ํ๋์ง ์ ์ถํด๋ณผ ์ ์์ ๊ฒ ๊ฐ๋ค. build๋ก ๋ฃ์ ํด๋ก์ ๋ ๋ค๋ฅธ thread์์ ๋์ํ๊ฒ ํ๊ณ , ๊ฒฐ๊ณผ๋ฅผ ํน์ buffer์ ๋ฐ์ ๋ค,
AsyncStream.Continuation
A mechanism to interface between synchronous code and an asynchronous stream.
- Producing Element
- yield method๋ฅผ ์ฌ์ฉํด์ ์์ฑํ type์ instance๋ฅผ stream์ ์ ๊ณตํ๋ผ๊ณ ๋ฌธ์์ ๋์์๋ค.
- Finishing the Stream
- stream ์ฃผ์ ์ด ๋ชจ๋ ๋๋๋ฉด, finish method๋ฅผ ์ฌ์ฉํ์ฌ stream์ ์ข ๋ฃํ๋ผ๊ณ ํ๋ค.
AsyncThrowingStream
AsyncStream์ด AsyncSequence๋ฅผ ์ฝ๊ฒ ๋ง๋ค์ด์ฃผ๋ ์น๊ตฌ๋ผ๋ฉด, ์ด์ ๊ธ์์ ๋ณด์๋ฏ throw๋ฅผ ์ฒ๋ฆฌํ๋ ๋ ์๋ ์ฝ๊ฒ ๋ง๋ค ์ ์์ด์ผ ํ๋ค. ์ด๋ฅผ ์ฝ๊ฒ ๋ง๋ค์ด์ฃผ๋ ์น๊ตฌ๊ฐ ์๋ ์์ด๋ค.
enum CustomError: Error {
case fiveError
}
let digits = AsyncThrowingStream<Int, Error> { continuation in // AsyncThrowingStream.Continuation โ
AsyncStream.Continuation โ
for _ in 1...10 {
let digit = Int.random(in: 1...10)
if digit == 5 {
continuation.finish(throwing: CustomError.fiveError)
}
continuation.yield(digit)
}
continuation.finish()
}
Task {
do {
for try await digit in digits {
print ("\(digit)")
}
} catch {
print(error)
}
}
// 10
// 7
// 9
// 7
// fiveError
onTermination
๋ ํ์
๋ชจ๋, stream์ด termination ๋๋ ์์ ์ continuation์ onTermination
์ฝ๋ฐฑ์ ์ง์ ํ ์ ์๋ค. (AsyncStream.Continuation.Termination
, AsyncThrowingStream.Continuation.Termination
)
let digits = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("producing finished")
case .cancelled:
print("producing cancelled")
@unknown default:
fatalError()
}
}
for _ in 1...10 {
let digit = Int.random(in: 1...10)
if digit == 5 {
continuation.onTermination?(.cancelled)
}
print("produced: ", digit)
continuation.yield(digit)
}
print("before producing finish")
continuation.finish()
print("after producing finish")
}
Task {
do {
for try await digit in digits {
print ("stream: \(digit)")
}
} catch {
print(error)
}
}
produced: 8
producing cancelled
producing finished
produced: 5
produced: 9
produced: 5
produced: 3
produced: 7
produced: 9
produced: 6
produced: 7
produced: 4
before producing finish
after producing finish
stream: 8
์์ ๋์์ ํ์ ๋, onTermination
์ ํธ์ถํ๋ฉด ๋์์ด ๋ฉ์ถ๋ ๊ฒ์ผ๋ก ์ดํดํ๋๋ฐ ๊ทธ๋ฌ์ง๋ ์๋ ๋ฏ ํ๋ค.
This means that you can perform needed cleanup in the cancellation handler.
๊ณต์ ๋ฌธ์์ ๋์จ ํํ์ผ๋ก ๋ฏธ๋ฃจ์ด ์ง์ํด๋ณด์์ ๋, ํด๋น ์คํธ๋ฆผ์ด finish ๋๊ฑฐ๋ cancel๋ ๋ ๋ฐ์ํ๋ ๋์์ ๊ท์ ํ๋ ๋ฏ ํ๋ค. ๋ง์ฝ digit์ด 5์ธ ๊ฒฝ์ฐ ๋์ ์ํ์ ์ข
๋ฃํ๊ณ ์ถ๋ค๋ฉด, break
๋ฌธ๋ฑ์ ์ถ๊ฐํ์ฌ ๋ง๋๋ ๊ฒ์ด ๋ง๋ ๋ฏํ๋ค.
์ถํ์ ๊ณต๋ถํ๋ฉด์ ์๊ฒ ๋ ๊ฒ์ธ๋ฐ, ์ฌ๊ธฐ์ onTerminate์ cancel์ structured concurrency์ ๊ด๋ จ์ด ์๋ ๋ฏ ํ๋ค. ์ถํ ๊ธ์์ ์์๋ณด๋๋ก ํ์.