AsyncSequence

extension AsyncSequence
extension AsyncSequence where Element: Equatable
  • Assigns each element from an async sequence to a property on an object.

    class MyClass {
        var value: Int = 0 {
            didSet { print("Set to \(self.value)") }
        }
    }
    
    
    let sequence = AsyncStream<Int> { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.finish()
    }
    
    let object = MyClass()
    sequence.assign(to: \.value, on: object)
    
    // Prints:
    // Set to 1
    // Set to 2
    // Set to 3
    

    Declaration

    Swift

    @discardableResult
    public func assign<Root>(
        to keyPath: ReferenceWritableKeyPath<Root, Element>,
        on object: Root
    ) rethrows -> Task<Void, Error>

    Parameters

    keyPath

    A key path to indicate the property to be assign.

    object

    The object that contains the property.

    Return Value

    A Task<Void, Error>. It is not required to keep reference to the task, but it does give the ability to cancel the assign by calling cancel().

  • first() Asynchronous

    The first element of the sequence, if there is one.

    Declaration

    Swift

    public func first() async rethrows -> Element?
  • collect(_:) Asynchronous

    Collect elements from a sequence.

    // Collect all elements.
    var values = await self.sequence.collect()
    print(values)
    
    // Prints:
    // [1, 2, 3]
    
    // Collect only 2 elements.
    values = await self.sequence.collect(2)
    print(values)
    
    // Prints:
    // [1, 2]
    

    Declaration

    Swift

    public func collect(_ numberOfElements: Int? = .none) async rethrows -> [Element]

    Parameters

    numberOfElements

    The number of elements to collect. By default this is nil which indicates all elements will be collected. If the number of elements in the sequence is less than the number of elements requested, then all the elements will be collected.

    Return Value

    Returns: An array of all elements.

  • Consume the async sequence and pass the element’s to a closure.

    let sequence = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.finish()
    }
    
    sequence.sink { print($0) }
    
    // Prints:
    // 1
    // 2
    // 3
    

    Declaration

    Swift

    @discardableResult
    public func sink(_ receiveValue: @escaping (Element) async -> Void) -> Task<Void, Error>

    Parameters

    receiveValue

    The closure to execute on receipt of a value.

    Return Value

    A task instance.

  • Consume the async sequence and pass the element’s and it’s completion state to two closures.

    let sequence = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.finish(throwing: TestError())
    }
    
    sequence.sink(
        receiveValue: { print("Value: \($0)") },
        receiveCompletion: { print("Complete: \($0)") }
    )
    
    // Prints:
    // Value: 1
    // Value: 2
    // Value: 3
    // Complete: failure(TestError())
    

    Declaration

    Swift

    @discardableResult
    public func sink(
        receiveValue: @escaping (Element) async -> Void,
        receiveCompletion: @escaping (AsyncSequenceCompletion<Error>) async -> Void
    ) -> Task<Void, Never>

    Parameters

    receiveValue

    The closure to execute on receipt of a value.

    receiveCompletion

    The closure to execute on completion.

    Return Value

    A task instance.

Erasure

Chain

  • An asynchronous sequence that chains two async sequences.

    The combined sequence first emits the all the values from the first sequence and then emits all values from the second.

    let sequenceA = AsyncStream<Int> { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.finish()
    }
    
    let sequenceB = AsyncStream<Int> { continuation in
        continuation.yield(4)
        continuation.yield(5)
        continuation.yield(6)
        continuation.finish()
    }
    
    let sequenceC = AsyncStream<Int> { continuation in
        continuation.yield(7)
        continuation.yield(8)
        continuation.yield(9)
        continuation.finish()
    }
    
    for await value in sequenceA.chain(with: sequenceB).chain(with: sequenceC) {
        print(value)
    }
    
    // Prints:
    // 1
    // 2
    // 3
    // 4
    // 5
    // 6
    // 7
    // 8
    // 9
    

    Declaration

    Swift

    public func chain<P>(with sequence: P) -> ChainAsyncSequence<Self, P> where P : AsyncSequence, Self.Element == P.Element

    Parameters

    lhs

    The first async sequence to iterate through.

    rhs

    The second async sequence to iterate through.

    Return Value

    A async sequence chains the two sequences.

Combine latest

  • Combine three async sequences.

    The combined sequence emits a tuple of the most-recent elements from each sequence when any of them emit a value.

    If one sequence never emits a value this sequence will finish.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.yield(4)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(5)
        continuation.yield(6)
        continuation.yield(7)
        continuation.yield(8)
        continuation.yield(9)
        continuation.finish()
    }
    
    let streamC = .init { continuation in
        continuation.yield(10)
        continuation.yield(11)
        continuation.finish()
    }
    
    for await value in streamA.combineLatest(streamB, streamC) {
        print(value)
    }
    
    // Prints:
    // (1, 5, 10)
    // (2, 6, 11)
    // (3, 7, 11)
    // (4, 8, 11)
    // (4, 9, 11)
    

    Declaration

    Swift

    public func combineLatest<Q, R>(
        _ q: Q,
        _ r: R
    ) -> CombineLatest3AsyncSequence<Self, Q, R> where Q: AsyncSequence, R: AsyncSequence

    Parameters

    q

    Another async sequence to combine with.

    r

    Another async sequence to combine with.

    Return Value

    A async sequence combines elements from all sequences.

  • Combine with an additional async sequence to produce a AsyncCombineLatest2Sequence.

    The combined sequence emits a tuple of the most-recent elements from each sequence when any of them emit a value.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.yield(4)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(5)
        continuation.yield(6)
        continuation.yield(7)
        continuation.yield(8)
        continuation.yield(9)
        continuation.finish()
    }
    
    for await value in self.streamA.combineLatest(self.streamB) {
        print(value)
    }
    
    // Prints:
    // (1, 5)
    // (2, 6)
    // (3, 7)
    // (4, 8)
    // (4, 9)
    

    Declaration

    Swift

    public func combineLatest<Q>(
        _ other: Q
    ) -> CombineLatestAsyncSequence<Self, Q> where Q: AsyncSequence

    Parameters

    other

    Another async sequence to combine with.

    Return Value

    A async sequence combines elements from this and another async sequence.

Debounce

  • Emits elements only after a specified time interval elapses between emissions.

    Use the debounce operator to control the number of values and time between delivery of values from the base async sequence. This operator is useful to process bursty or high-volume async sequences where you need to reduce the number of elements emitted to a rate you specify.

    let stream = AsyncStream<Int> { continuation in
        continuation.yield(0)
        try? await Task.sleep(nanoseconds: 200_000_000)
        continuation.yield(1)
        try? await Task.sleep(nanoseconds: 200_000_000)
        continuation.yield(2)
        continuation.yield(3)
        continuation.yield(4)
        continuation.yield(5)
        continuation.finish()
    }
    
    for element in try await self.stream.debounce(for: 0.1) {
        print(element)
    }
    
    // Prints:
    // 0
    // 1
    // 5
    

    Declaration

    Swift

    public func debounce(for dueTime: TimeInterval) -> DebounceAsyncSequence<Self>

    Parameters

    base

    The async sequence in which this sequence receives it’s elements.

    dueTime

    The amount of time the async sequence should wait before emitting an element.

    Return Value

    A DebounceAsyncSequence instance.

Delay

  • Delays emission of all elements by the provided interval.

    let stream = AsyncStream<Int> { continuation in
        continuation.yield(0)
        continuation.yield(1)
        continuation.yield(2)
        continuation.finish()
    }
    
    let start = Date.now
    for element in try await self.stream.delay(for: 0.5) {
        print("\(element) - \(Date.now.timeIntervalSince(start))")
    }
    
    // Prints:
    // 0 - 0.5
    // 1 - 1.0
    // 2 - 1.5
    

    Declaration

    Swift

    public func delay(for interval: TimeInterval) -> DelayAsyncSequence<Self>

    Parameters

    base

    The async sequence in which this sequence receives it’s elements.

    interval

    The amount of time the async sequence should wait before emitting an element.

    Return Value

    A DebounceAsyncSequence instance.

Merge

  • An asynchronous sequence that merges three async sequences.

    The sequences are iterated through in parallel.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(4)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(2)
        continuation.finish()
    }
    
    let streamC = .init { continuation in
        continuation.yield(3)
        continuation.finish()
    }
    
    for await value in streamA.merge(with: streamB, streamC) {
        print(value)
    }
    
    // Prints:
    // 1
    // 2
    // 3
    // 4
    

    Declaration

    Swift

    public func merge(
        with q: Self,
        _ r: Self
    ) -> Merge3AsyncSequence<Self>

    Parameters

    q

    An async sequence.

    r

    An async sequence.

    Return Value

    A async sequence merges elements from this and another async sequence.

  • An asynchronous sequence that merges two async sequence.

    The sequences are iterated through in parallel.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.yield(3)
        continuation.yield(4)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(5)
        continuation.yield(6)
        continuation.yield(7)
        continuation.yield(8)
        continuation.yield(9)
        continuation.finish()
    }
    
    for await value in streamA.merge(with: streamB) {
        print(value)
    }
    
    // Prints:
    // 1
    // 5
    // 2
    // 6
    // 3
    // 7
    // 4
    // 8
    // 9
    

    Declaration

    Swift

    public func merge(
        with other: Self
    ) -> MergeAsyncSequence<Self>

    Parameters

    other

    Another async sequence to merge with.

    Return Value

    A async sequence merges elements from this and another async sequence.

Replace error

  • Replaces any errors in the async sequence with the provided element.

    let sequence = Fail<Int, TestError>(
        error: TestError()
    )
    .replaceError(with: 0)
    
    for await value in stream {
        print(value)
    }
    
    // Prints:
    // 0
    

    Declaration

    Swift

    public func replaceError(with output: Element) -> ReplaceErrorAsyncSequence<Self>

    Parameters

    output

    The element with which to replace errors from the base async sequence.

    Return Value

    A ReplaceErrorAsyncSequence instance.

Shared

  • Creates a shareable async sequence that can be used across multiple tasks.

    Declaration

    Swift

    public func shared() -> SharedAsyncSequence<Self>

Throttle

  • Emits either the most-recent or first element emitted by the base async sequence in the specified time interval.

    ThrottleAsyncSequence selectively emits elements from a base async sequence during an interval you specify. Other elements received within the throttling interval aren’t emitted.

    let stream = AsyncStream<Int> { continuation in
        continuation.yield(0)
        try? await Task.sleep(nanoseconds: 100_000_000)
        continuation.yield(1)
        try? await Task.sleep(nanoseconds: 100_000_000)
        continuation.yield(2)
        continuation.yield(3)
        continuation.yield(4)
        continuation.yield(5)
        continuation.finish()
    }
    
    for element in try await self.stream.throttle(for: 0.05, latest: true) {
        print(element)
    }
    
    // Prints:
    // 0
    // 1
    // 2
    // 5
    

    Declaration

    Swift

    public func throttle(for interval: TimeInterval, latest: Bool) -> ThrottleAsyncSequence<Self>

    Parameters

    interval

    The interval in which to emit the most recent element.

    latest

    A Boolean value indicating whether to emit the most recent element.

    Return Value

    A ThrottleAsyncSequence instance.

Zip

  • Create an asynchronous sequence that applys a zip function to the three async sequences.

    Combines the latest elements from three async sequences and emits a tuple.

    The async sequence waits until both provided async sequences have emitted an element, then emits both elements as a tuple.

    If one sequence never emits a value or raises an error then the zipped sequence will finish.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(5)
        continuation.yield(6)
        continuation.yield(7)
        continuation.finish()
    }
    
    let streamC = .init { continuation in
        continuation.yield(8)
        continuation.yield(9)
        continuation.finish()
    }
    
    for await value in streamA.zip(streamB, streamC) {
        print(value)
    }
    
    // Prints:
    // (1, 5, 8)
    // (2, 6, 9)
    

    Declaration

    Swift

    public func zip<Q, R>(
        _ q: Q,
        _ r: R
    ) -> Zip3AsyncSequence<Self, Q, R> where Q: AsyncSequence, R: AsyncSequence

    Parameters

    q

    Another async sequence.

    r

    Another async sequence.

    Return Value

    A async sequence zips elements from this and another async sequence.

  • Create an asynchronous sequence that applys a zip function to the two async sequences.

    Combines the latest elements from two async sequences and emits a tuple.

    The async sequence waits until both provided async sequences have emitted an element, then emits both elements as a tuple.

    If one sequence never emits a value or raises an error then the zipped sequence will finish.

    let streamA = .init { continuation in
        continuation.yield(1)
        continuation.yield(2)
        continuation.finish()
    }
    
    let streamB = .init { continuation in
        continuation.yield(5)
        continuation.yield(6)
        continuation.yield(7)
        continuation.finish()
    }
    
    for await value in streamA.zip(streamB) {
        print(value)
    }
    
    // Prints:
    // (1, 5)
    // (2, 6)
    

    Declaration

    Swift

    public func zip<Q>(
        _ other: Q
    ) -> ZipAsyncSequence<Self, Q> where Q: AsyncSequence

    Parameters

    other

    Another async sequence to zip with.

    Return Value

    A async sequence zips elements from this and another async sequence.

Available where Element: Equatable

  • Emits only elements that don’t match the previous element.

    Declaration

    Swift

    public func removeDuplicates() -> RemoveDuplicatesAsyncSequence<Self>

    Return Value

    A AsyncRemoveDuplicatesSequence instance.

  • Omits any element that the predicate determines is equal to the previous element.

    Declaration

    Swift

    public func removeDuplicates(
        by predicate: @escaping RemoveDuplicatesAsyncSequence<Self>.Predicate
    ) -> RemoveDuplicatesAsyncSequence<Self>

    Parameters

    predicate

    A closure to evaluate whether two elements are equivalent. Return true from this closure to indicate that the second element is a duplicate of the first.

    Return Value

    A AsyncRemoveDuplicatesSequence instance.