Async iterators enable iteration over asynchronous data sources. Here's how to use them effectively.
Basic Async Iterator#
1// Async iterator object
2const asyncIterable = {
3 [Symbol.asyncIterator]() {
4 let i = 0;
5 return {
6 async next() {
7 await new Promise(r => setTimeout(r, 100));
8
9 if (i < 3) {
10 return { value: i++, done: false };
11 }
12 return { value: undefined, done: true };
13 }
14 };
15 }
16};
17
18// Consume with for-await-of
19async function consume() {
20 for await (const value of asyncIterable) {
21 console.log(value); // 0, 1, 2 (with delays)
22 }
23}Async Generator Functions#
1// async function* creates async generators
2async function* asyncRange(start, end, delay = 100) {
3 for (let i = start; i <= end; i++) {
4 await new Promise(r => setTimeout(r, delay));
5 yield i;
6 }
7}
8
9// Usage
10async function main() {
11 for await (const num of asyncRange(1, 5)) {
12 console.log(num); // 1, 2, 3, 4, 5 (with delays)
13 }
14}
15
16// With async data fetching
17async function* fetchPages(baseUrl, maxPages = 10) {
18 let page = 1;
19
20 while (page <= maxPages) {
21 const response = await fetch(`${baseUrl}?page=${page}`);
22 const data = await response.json();
23
24 if (data.items.length === 0) {
25 return; // No more pages
26 }
27
28 yield data.items;
29 page++;
30 }
31}
32
33// Consume pages
34for await (const items of fetchPages('/api/products')) {
35 console.log('Got items:', items.length);
36}Fetching Paginated Data#
1// Paginated API iterator
2async function* paginatedFetch(url, options = {}) {
3 let nextUrl = url;
4
5 while (nextUrl) {
6 const response = await fetch(nextUrl, options);
7 const data = await response.json();
8
9 yield* data.results; // Yield each item
10
11 nextUrl = data.next; // Link to next page
12 }
13}
14
15// Usage
16async function getAllUsers() {
17 const users = [];
18
19 for await (const user of paginatedFetch('/api/users')) {
20 users.push(user);
21 }
22
23 return users;
24}
25
26// With rate limiting
27async function* rateLimitedFetch(url, delayMs = 1000) {
28 let page = 1;
29 let hasMore = true;
30
31 while (hasMore) {
32 const response = await fetch(`${url}?page=${page}`);
33 const data = await response.json();
34
35 yield data;
36
37 hasMore = data.hasNextPage;
38 page++;
39
40 if (hasMore) {
41 await new Promise(r => setTimeout(r, delayMs));
42 }
43 }
44}Processing Streams#
1// Process readable stream
2async function* streamToLines(readableStream) {
3 const reader = readableStream.getReader();
4 const decoder = new TextDecoder();
5 let buffer = '';
6
7 try {
8 while (true) {
9 const { done, value } = await reader.read();
10
11 if (done) {
12 if (buffer) yield buffer;
13 break;
14 }
15
16 buffer += decoder.decode(value, { stream: true });
17 const lines = buffer.split('\n');
18 buffer = lines.pop(); // Keep incomplete line
19
20 for (const line of lines) {
21 yield line;
22 }
23 }
24 } finally {
25 reader.releaseLock();
26 }
27}
28
29// Usage
30const response = await fetch('/api/stream');
31
32for await (const line of streamToLines(response.body)) {
33 console.log('Line:', line);
34}Event Stream Processing#
1// Convert events to async iterator
2function eventIterator(element, eventName) {
3 const events = [];
4 let resolve = null;
5 let done = false;
6
7 element.addEventListener(eventName, (event) => {
8 if (resolve) {
9 resolve({ value: event, done: false });
10 resolve = null;
11 } else {
12 events.push(event);
13 }
14 });
15
16 return {
17 [Symbol.asyncIterator]() {
18 return this;
19 },
20
21 async next() {
22 if (done) {
23 return { value: undefined, done: true };
24 }
25
26 if (events.length > 0) {
27 return { value: events.shift(), done: false };
28 }
29
30 return new Promise((r) => {
31 resolve = r;
32 });
33 },
34
35 return() {
36 done = true;
37 return { value: undefined, done: true };
38 }
39 };
40}
41
42// Usage
43const clicks = eventIterator(button, 'click');
44
45for await (const event of clicks) {
46 console.log('Clicked at:', event.clientX, event.clientY);
47
48 if (shouldStop) {
49 break; // Calls return(), stops listening
50 }
51}WebSocket Messages#
1// Async iterator for WebSocket
2function websocketIterator(url) {
3 const ws = new WebSocket(url);
4 const messages = [];
5 let resolve = null;
6 let reject = null;
7 let closed = false;
8
9 ws.onmessage = (event) => {
10 if (resolve) {
11 resolve({ value: event.data, done: false });
12 resolve = null;
13 } else {
14 messages.push(event.data);
15 }
16 };
17
18 ws.onerror = (error) => {
19 if (reject) reject(error);
20 };
21
22 ws.onclose = () => {
23 closed = true;
24 if (resolve) {
25 resolve({ value: undefined, done: true });
26 }
27 };
28
29 return {
30 [Symbol.asyncIterator]() {
31 return this;
32 },
33
34 async next() {
35 if (closed && messages.length === 0) {
36 return { value: undefined, done: true };
37 }
38
39 if (messages.length > 0) {
40 return { value: messages.shift(), done: false };
41 }
42
43 return new Promise((res, rej) => {
44 resolve = res;
45 reject = rej;
46 });
47 },
48
49 return() {
50 ws.close();
51 return { value: undefined, done: true };
52 }
53 };
54}
55
56// Usage
57for await (const message of websocketIterator('wss://api.example.com')) {
58 const data = JSON.parse(message);
59 console.log('Received:', data);
60}Transforming Async Iterables#
1// Map async iterable
2async function* asyncMap(iterable, fn) {
3 for await (const item of iterable) {
4 yield fn(item);
5 }
6}
7
8// Filter async iterable
9async function* asyncFilter(iterable, predicate) {
10 for await (const item of iterable) {
11 if (predicate(item)) {
12 yield item;
13 }
14 }
15}
16
17// Take first n items
18async function* asyncTake(iterable, n) {
19 let count = 0;
20 for await (const item of iterable) {
21 if (count >= n) break;
22 yield item;
23 count++;
24 }
25}
26
27// Batch items
28async function* asyncBatch(iterable, size) {
29 let batch = [];
30
31 for await (const item of iterable) {
32 batch.push(item);
33
34 if (batch.length >= size) {
35 yield batch;
36 batch = [];
37 }
38 }
39
40 if (batch.length > 0) {
41 yield batch;
42 }
43}
44
45// Pipeline
46const result = asyncTake(
47 asyncFilter(
48 asyncMap(
49 fetchPages('/api/items'),
50 item => ({ ...item, processed: true })
51 ),
52 item => item.active
53 ),
54 100
55);
56
57for await (const item of result) {
58 console.log(item);
59}Parallel Processing#
1// Process with limited concurrency
2async function* parallelMap(iterable, fn, concurrency = 3) {
3 const iterator = iterable[Symbol.asyncIterator]();
4 const pending = new Set();
5 const results = [];
6 let done = false;
7
8 async function processNext() {
9 const { value, done: isDone } = await iterator.next();
10 if (isDone) {
11 done = true;
12 return;
13 }
14
15 const promise = fn(value).then(result => {
16 pending.delete(promise);
17 results.push(result);
18 });
19
20 pending.add(promise);
21
22 if (pending.size >= concurrency) {
23 await Promise.race(pending);
24 }
25 }
26
27 while (!done || pending.size > 0) {
28 while (!done && pending.size < concurrency) {
29 await processNext();
30 }
31
32 if (results.length > 0) {
33 yield results.shift();
34 } else if (pending.size > 0) {
35 await Promise.race(pending);
36 }
37 }
38}
39
40// Usage
41const urls = ['url1', 'url2', 'url3', /* ... */];
42
43for await (const data of parallelMap(urls, fetchData, 5)) {
44 console.log('Fetched:', data);
45}Error Handling#
1// Errors in async iterators
2async function* unreliableSource() {
3 yield 1;
4 yield 2;
5 throw new Error('Connection lost');
6 yield 3; // Never reached
7}
8
9// Handle errors
10async function consumeWithErrorHandling() {
11 try {
12 for await (const value of unreliableSource()) {
13 console.log(value);
14 }
15 } catch (error) {
16 console.error('Error:', error.message);
17 }
18}
19
20// Retry pattern
21async function* withRetry(createIterator, maxRetries = 3) {
22 let retries = 0;
23 let iterator = createIterator();
24
25 while (true) {
26 try {
27 const { value, done } = await iterator.next();
28 if (done) break;
29 yield value;
30 retries = 0; // Reset on success
31 } catch (error) {
32 if (retries >= maxRetries) throw error;
33 retries++;
34 console.log(`Retry ${retries}/${maxRetries}`);
35 iterator = createIterator(); // Restart
36 }
37 }
38}Collecting Results#
1// Collect all values
2async function asyncToArray(iterable) {
3 const results = [];
4 for await (const item of iterable) {
5 results.push(item);
6 }
7 return results;
8}
9
10// Reduce
11async function asyncReduce(iterable, fn, initial) {
12 let accumulator = initial;
13 for await (const item of iterable) {
14 accumulator = fn(accumulator, item);
15 }
16 return accumulator;
17}
18
19// Find first
20async function asyncFind(iterable, predicate) {
21 for await (const item of iterable) {
22 if (predicate(item)) return item;
23 }
24 return undefined;
25}
26
27// Usage
28const items = await asyncToArray(fetchPages('/api/items'));
29const sum = await asyncReduce(asyncRange(1, 100), (a, b) => a + b, 0);
30const firstActive = await asyncFind(users, u => u.active);Best Practices#
Use Cases:
✓ Paginated API fetching
✓ Stream processing
✓ Real-time data (WebSockets)
✓ Large dataset iteration
Patterns:
✓ Use async generators for simplicity
✓ Implement cleanup in return()
✓ Handle errors gracefully
✓ Consider backpressure
Performance:
✓ Process items as they arrive
✓ Limit concurrency for parallel ops
✓ Use batching for efficiency
✓ Clean up resources
Avoid:
✗ Loading everything into memory
✗ Ignoring error handling
✗ Forgetting to close resources
✗ Blocking the event loop
Conclusion#
Async iterators provide an elegant way to handle asynchronous data streams. Use async function* to create generators that yield values asynchronously, and for await...of to consume them. They're ideal for paginated APIs, real-time data, and processing large datasets. Combine with transformation utilities for powerful data pipelines. Always handle errors and clean up resources properly.