Asynchone

Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.

Requirements

  • iOS 14.0+
  • macOS 12.0+

Installation

Swift Package Manager

In Xcode:

  1. Click Project.
  2. Click Package Dependencies.
  3. Click +.
  4. Enter package URL: https://github.com/reddavis/Asynchrone.
  5. Add Asynchone to your app target.

Documentation

Documentation can be found here.

Overview

AsyncSequence

Extensions

First

let sequence = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

print(await sequence.first())

// Prints:
// 1

Collect

let sequence = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

print(await sequence.collect())

// Prints:
// [1, 2, 3]

Sink

let sequence = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

sequence.sink { print($0) }

// Prints:
// 1
// 2
// 3

Sink with completion

let sequence = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish(throwing: TestError())
}

sequence.sink(
    receiveValue: { print("Value: \($0)") },
    receiveCompletion: { print("Complete: \($0)") }
)

// Prints:
// Value: 1
// Value: 2
// Value: 3
// Complete: failure(TestError())

AnyAsyncSequenceable

let sequence = Just(1)
    .map(String.init)
    .eraseToAnyAsyncSequenceable()

AnyThrowingAsyncSequenceable

let stream = Fail<Int, TestError>(error: TestError.a)
    .eraseToAnyThrowingAsyncSequenceable()

CombineLatestAsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.combineLatest(streamB) {
    print(value)
}

// Prints:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)
// (4, 9)

CombineLatest3AsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(10)
    continuation.yield(11)
    continuation.finish()
}

for await value in streamA.combineLatest(streamB, streamC) {
    print(value)
}

// Prints:
// (1, 5, 10)
// (2, 6, 11)
// (3, 7, 11)
// (4, 8, 11)
// (4, 9, 11)

CurrentElementAsyncSequence

let sequence = CurrentElementAsyncSequence(0)
print(await sequence.element)

await stream.yield(1)
print(await sequence.element)

await stream.yield(2)
await stream.yield(3)
await stream.yield(4)
print(await sequence.element)

// Prints:
// 0
// 1
// 4

DebounceAsyncSequence

let stream = AsyncStream<Int> { continuation in
    continuation.yield(0)
    try? await Task.sleep(nanoseconds: 200_000_000)
    continuation.yield(1)
    try? await Task.sleep(nanoseconds: 200_000_000)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.yield(5)
    continuation.finish()
}

for element in try await self.stream.debounce(for: 0.1) {
    print(element)
}

// Prints:
// 0
// 1
// 5

Fail

let stream = Fail<Int, TestError>(error: TestError())

do {
    for try await value in stream {
        print(value)
    }
} catch {
    print("Error!")
}

// Prints:
// Error!

Just

let stream = Just(1)

for await value in stream {
    print(value)
}

// Prints:
// 1

MergeAsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.merge(with: streamB) {
    print(value)
}

// Prints:
// 1
// 5
// 2
// 6
// 3
// 7
// 4
// 8
// 9

Merge3AsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(2)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(3)
    continuation.finish()
}

for await value in self.streamA.merge(with: self.streamB, self.streamC) {
    print(value)
}

// Prints:
// 1
// 2
// 3
// 4

PassthroughAsyncSequence

let sequence = PassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish()

for await value in sequence {
    print(value)
}

// Prints:
// 0
// 1
// 2

RemoveDuplicatesAsyncSequence

let stream = .init { continuation in
    continuation.yield(1)
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

for await value in stream.removeDuplicates() {
    print(value)
}

// Prints:
// 1
// 2
// 3

ReplaceErrorAsyncSequence

let sequence = Fail<Int, TestError>(
    error: TestError()
)
.replaceError(with: 0)

for await value in stream {
    print(value)
}

// Prints:
// 0

SharedAsyncSequence

let values = [
    "a",
    "ab",
    "abc",
    "abcd"
]

let stream = AsyncStream { continuation in
    for value in values {
        continuation.yield(value)
    }
    continuation.finish()
}
.shared()

Task {
    let values = try await self.stream.collect()
    // ...
}

Task.detached {
    let values = try await self.stream.collect()
    // ...
}

let values = try await self.stream.collect()
// ...

ThrottleAsyncSequence

let stream = AsyncStream<Int> { continuation in
    continuation.yield(0)
    try? await Task.sleep(nanoseconds: 100_000_000)
    continuation.yield(1)
    try? await Task.sleep(nanoseconds: 100_000_000)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.yield(5)
    continuation.finish()
}

for element in try await self.stream.throttle(for: 0.05, latest: true) {
    print(element)
}

// Prints:
// 0
// 1
// 2
// 5

ThrowingPassthroughAsyncSequence

let sequence = ThrowingPassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish(throwing: TestError())

do {
    for try await value in sequence {
      print(value)
    }
} catch {
    print("Error!")
}

// Prints:
// 0
// 1
// 2
// Error!

ZipAsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.finish()
}

for await value in streamA.zip(streamB) {
    print(value)
}

// Prints:
// (1, 5)
// (2, 6)

Zip3AsyncSequence

let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.zip(streamB, streamC) {
    print(value)
}

// Prints:
// (1, 5, 8)
// (2, 6, 9)

AsyncStream

AsyncStream.Continuation

AsyncThrowingStream

AsyncThrowingStream.Continuation

License

Whatevs.