Async iterators enable iteration over asynchronous data sources. Here's how to use them.
Basic Async Iteration#
1// for-await-of loop
2async function processUrls(urls) {
3 for await (const response of fetchAll(urls)) {
4 console.log(await response.json());
5 }
6}
7
8// Async iterable object
9const asyncIterable = {
10 [Symbol.asyncIterator]() {
11 let i = 0;
12 return {
13 async next() {
14 if (i < 3) {
15 await delay(100);
16 return { value: i++, done: false };
17 }
18 return { done: true };
19 },
20 };
21 },
22};
23
24// Using the async iterable
25async function main() {
26 for await (const num of asyncIterable) {
27 console.log(num); // 0, 1, 2
28 }
29}Async Generators#
1// Basic async generator
2async function* asyncCounter(max) {
3 for (let i = 0; i < max; i++) {
4 await delay(100);
5 yield i;
6 }
7}
8
9// Usage
10async function main() {
11 for await (const num of asyncCounter(5)) {
12 console.log(num); // 0, 1, 2, 3, 4
13 }
14}
15
16// Fetch pages of data
17async function* fetchPages(baseUrl) {
18 let page = 1;
19 let hasMore = true;
20
21 while (hasMore) {
22 const response = await fetch(`${baseUrl}?page=${page}`);
23 const data = await response.json();
24
25 yield data.items;
26
27 hasMore = data.hasNextPage;
28 page++;
29 }
30}
31
32// Usage
33async function getAllItems() {
34 const allItems = [];
35
36 for await (const items of fetchPages('/api/items')) {
37 allItems.push(...items);
38 }
39
40 return allItems;
41}Streaming Data#
1// Stream file line by line
2async function* readLines(stream) {
3 const reader = stream.getReader();
4 const decoder = new TextDecoder();
5 let buffer = '';
6
7 try {
8 while (true) {
9 const { done, value } = await reader.read();
10
11 if (done) {
12 if (buffer) yield buffer;
13 break;
14 }
15
16 buffer += decoder.decode(value, { stream: true });
17 const lines = buffer.split('\n');
18 buffer = lines.pop() || '';
19
20 for (const line of lines) {
21 yield line;
22 }
23 }
24 } finally {
25 reader.releaseLock();
26 }
27}
28
29// Usage
30async function processFile() {
31 const response = await fetch('/large-file.txt');
32 const stream = response.body;
33
34 for await (const line of readLines(stream)) {
35 processLine(line);
36 }
37}
38
39// Stream JSON objects
40async function* parseJSONStream(stream) {
41 for await (const line of readLines(stream)) {
42 if (line.trim()) {
43 yield JSON.parse(line);
44 }
45 }
46}Event Streams#
1// Convert events to async iterable
2async function* eventIterator(emitter, eventName) {
3 const queue = [];
4 let resolve = null;
5
6 const handler = (data) => {
7 if (resolve) {
8 resolve({ value: data, done: false });
9 resolve = null;
10 } else {
11 queue.push(data);
12 }
13 };
14
15 emitter.on(eventName, handler);
16
17 try {
18 while (true) {
19 if (queue.length > 0) {
20 yield queue.shift();
21 } else {
22 yield await new Promise((r) => (resolve = r));
23 }
24 }
25 } finally {
26 emitter.off(eventName, handler);
27 }
28}
29
30// Usage
31async function handleMessages(socket) {
32 for await (const message of eventIterator(socket, 'message')) {
33 console.log('Received:', message);
34 }
35}
36
37// WebSocket async iterator
38async function* websocketIterator(url) {
39 const ws = new WebSocket(url);
40 const queue = [];
41 let resolve = null;
42 let done = false;
43
44 ws.onmessage = (event) => {
45 if (resolve) {
46 resolve(event.data);
47 resolve = null;
48 } else {
49 queue.push(event.data);
50 }
51 };
52
53 ws.onclose = () => {
54 done = true;
55 if (resolve) resolve(null);
56 };
57
58 await new Promise((r) => (ws.onopen = r));
59
60 try {
61 while (!done) {
62 const data = queue.length > 0
63 ? queue.shift()
64 : await new Promise((r) => (resolve = r));
65
66 if (data !== null) {
67 yield data;
68 }
69 }
70 } finally {
71 ws.close();
72 }
73}Transforming Async Iterables#
1// Map over async iterable
2async function* asyncMap(iterable, fn) {
3 for await (const item of iterable) {
4 yield fn(item);
5 }
6}
7
8// Filter async iterable
9async function* asyncFilter(iterable, predicate) {
10 for await (const item of iterable) {
11 if (predicate(item)) {
12 yield item;
13 }
14 }
15}
16
17// Take first n items
18async function* asyncTake(iterable, n) {
19 let count = 0;
20 for await (const item of iterable) {
21 if (count >= n) break;
22 yield item;
23 count++;
24 }
25}
26
27// Skip first n items
28async function* asyncSkip(iterable, n) {
29 let count = 0;
30 for await (const item of iterable) {
31 if (count >= n) {
32 yield item;
33 }
34 count++;
35 }
36}
37
38// Batch items
39async function* asyncBatch(iterable, size) {
40 let batch = [];
41
42 for await (const item of iterable) {
43 batch.push(item);
44 if (batch.length >= size) {
45 yield batch;
46 batch = [];
47 }
48 }
49
50 if (batch.length > 0) {
51 yield batch;
52 }
53}
54
55// Usage
56async function processUsers() {
57 const users = fetchAllUsers();
58
59 const activeAdults = asyncFilter(
60 asyncMap(users, enrichUser),
61 user => user.active && user.age >= 18
62 );
63
64 for await (const user of asyncTake(activeAdults, 100)) {
65 await sendEmail(user);
66 }
67}Aggregation Functions#
1// Collect all items
2async function asyncToArray(iterable) {
3 const result = [];
4 for await (const item of iterable) {
5 result.push(item);
6 }
7 return result;
8}
9
10// Reduce async iterable
11async function asyncReduce(iterable, fn, initial) {
12 let accumulator = initial;
13 for await (const item of iterable) {
14 accumulator = fn(accumulator, item);
15 }
16 return accumulator;
17}
18
19// Find first matching
20async function asyncFind(iterable, predicate) {
21 for await (const item of iterable) {
22 if (predicate(item)) {
23 return item;
24 }
25 }
26 return undefined;
27}
28
29// Check if any match
30async function asyncSome(iterable, predicate) {
31 for await (const item of iterable) {
32 if (predicate(item)) {
33 return true;
34 }
35 }
36 return false;
37}
38
39// Check if all match
40async function asyncEvery(iterable, predicate) {
41 for await (const item of iterable) {
42 if (!predicate(item)) {
43 return false;
44 }
45 }
46 return true;
47}
48
49// Count items
50async function asyncCount(iterable) {
51 let count = 0;
52 for await (const _ of iterable) {
53 count++;
54 }
55 return count;
56}Concurrent Processing#
1// Process with concurrency limit
2async function* asyncMapConcurrent(iterable, fn, concurrency = 3) {
3 const queue = [];
4 const results = [];
5 let index = 0;
6
7 for await (const item of iterable) {
8 const promise = fn(item, index++);
9 queue.push(promise);
10
11 if (queue.length >= concurrency) {
12 results.push(await Promise.race(queue));
13 queue.splice(queue.indexOf(results[results.length - 1]), 1);
14 }
15 }
16
17 results.push(...(await Promise.all(queue)));
18
19 for (const result of results) {
20 yield result;
21 }
22}
23
24// Pool of workers
25async function* asyncPool(iterable, fn, poolSize = 5) {
26 const executing = new Set();
27
28 for await (const item of iterable) {
29 const promise = fn(item).then((result) => {
30 executing.delete(promise);
31 return result;
32 });
33
34 executing.add(promise);
35
36 if (executing.size >= poolSize) {
37 yield await Promise.race(executing);
38 }
39 }
40
41 while (executing.size > 0) {
42 yield await Promise.race(executing);
43 }
44}Error Handling#
1// Generator with error handling
2async function* safeGenerator(iterable) {
3 for await (const item of iterable) {
4 try {
5 yield await processItem(item);
6 } catch (error) {
7 yield { error, item };
8 }
9 }
10}
11
12// Retry on failure
13async function* withRetry(iterable, maxRetries = 3) {
14 for await (const item of iterable) {
15 let lastError;
16
17 for (let i = 0; i < maxRetries; i++) {
18 try {
19 yield await processItem(item);
20 break;
21 } catch (error) {
22 lastError = error;
23 await delay(Math.pow(2, i) * 1000);
24 }
25 }
26
27 if (lastError) {
28 throw lastError;
29 }
30 }
31}
32
33// Timeout for each item
34async function* withTimeout(iterable, ms) {
35 for await (const item of iterable) {
36 yield await Promise.race([
37 processItem(item),
38 new Promise((_, reject) =>
39 setTimeout(() => reject(new Error('Timeout')), ms)
40 ),
41 ]);
42 }
43}Practical Examples#
1// Paginated API
2async function* fetchAllUsers(baseUrl) {
3 let cursor = null;
4
5 do {
6 const url = cursor
7 ? `${baseUrl}?cursor=${cursor}`
8 : baseUrl;
9
10 const response = await fetch(url);
11 const data = await response.json();
12
13 yield* data.users;
14
15 cursor = data.nextCursor;
16 } while (cursor);
17}
18
19// Database cursor
20async function* queryInBatches(query, batchSize = 100) {
21 let offset = 0;
22 let hasMore = true;
23
24 while (hasMore) {
25 const results = await db.query(query, { limit: batchSize, offset });
26
27 yield* results;
28
29 hasMore = results.length === batchSize;
30 offset += batchSize;
31 }
32}
33
34// Real-time updates
35async function* pollForUpdates(url, interval = 5000) {
36 let lastId = null;
37
38 while (true) {
39 const params = lastId ? `?since=${lastId}` : '';
40 const response = await fetch(`${url}${params}`);
41 const updates = await response.json();
42
43 for (const update of updates) {
44 yield update;
45 lastId = update.id;
46 }
47
48 await delay(interval);
49 }
50}Best Practices#
Design:
✓ Use async generators for streams
✓ Implement proper cleanup in finally
✓ Handle backpressure appropriately
✓ Consider memory usage
Error Handling:
✓ Wrap iteration in try-catch
✓ Implement retry logic when needed
✓ Add timeouts for safety
✓ Log errors appropriately
Performance:
✓ Use batching for efficiency
✓ Implement concurrency limits
✓ Avoid buffering entire streams
✓ Clean up resources promptly
Testing:
✓ Test with empty iterables
✓ Test error scenarios
✓ Test cancellation
✓ Mock async data sources
Conclusion#
Async iterators enable elegant handling of asynchronous data streams. Use async generators for producing values, for-await-of for consumption, and transformation functions for processing. Handle errors properly and implement cleanup in finally blocks.