Back to Blog
JavaScriptAsyncIteratorsStreams

JavaScript Async Iterators Guide

Master async iterators and for-await-of loops for handling asynchronous data streams.

B
Bootspring Team
Engineering
October 7, 2018
8 min read

Async iterators enable iteration over asynchronous data sources. Here's how to use them effectively.

Basic Async Iterator#

1// Async iterator object 2const asyncIterable = { 3 [Symbol.asyncIterator]() { 4 let i = 0; 5 return { 6 async next() { 7 await new Promise(r => setTimeout(r, 100)); 8 9 if (i < 3) { 10 return { value: i++, done: false }; 11 } 12 return { value: undefined, done: true }; 13 } 14 }; 15 } 16}; 17 18// Consume with for-await-of 19async function consume() { 20 for await (const value of asyncIterable) { 21 console.log(value); // 0, 1, 2 (with delays) 22 } 23}

Async Generator Functions#

1// async function* creates async generators 2async function* asyncRange(start, end, delay = 100) { 3 for (let i = start; i <= end; i++) { 4 await new Promise(r => setTimeout(r, delay)); 5 yield i; 6 } 7} 8 9// Usage 10async function main() { 11 for await (const num of asyncRange(1, 5)) { 12 console.log(num); // 1, 2, 3, 4, 5 (with delays) 13 } 14} 15 16// With async data fetching 17async function* fetchPages(baseUrl, maxPages = 10) { 18 let page = 1; 19 20 while (page <= maxPages) { 21 const response = await fetch(`${baseUrl}?page=${page}`); 22 const data = await response.json(); 23 24 if (data.items.length === 0) { 25 return; // No more pages 26 } 27 28 yield data.items; 29 page++; 30 } 31} 32 33// Consume pages 34for await (const items of fetchPages('/api/products')) { 35 console.log('Got items:', items.length); 36}

Fetching Paginated Data#

1// Paginated API iterator 2async function* paginatedFetch(url, options = {}) { 3 let nextUrl = url; 4 5 while (nextUrl) { 6 const response = await fetch(nextUrl, options); 7 const data = await response.json(); 8 9 yield* data.results; // Yield each item 10 11 nextUrl = data.next; // Link to next page 12 } 13} 14 15// Usage 16async function getAllUsers() { 17 const users = []; 18 19 for await (const user of paginatedFetch('/api/users')) { 20 users.push(user); 21 } 22 23 return users; 24} 25 26// With rate limiting 27async function* rateLimitedFetch(url, delayMs = 1000) { 28 let page = 1; 29 let hasMore = true; 30 31 while (hasMore) { 32 const response = await fetch(`${url}?page=${page}`); 33 const data = await response.json(); 34 35 yield data; 36 37 hasMore = data.hasNextPage; 38 page++; 39 40 if (hasMore) { 41 await new Promise(r => setTimeout(r, delayMs)); 42 } 43 } 44}

Processing Streams#

1// Process readable stream 2async function* streamToLines(readableStream) { 3 const reader = readableStream.getReader(); 4 const decoder = new TextDecoder(); 5 let buffer = ''; 6 7 try { 8 while (true) { 9 const { done, value } = await reader.read(); 10 11 if (done) { 12 if (buffer) yield buffer; 13 break; 14 } 15 16 buffer += decoder.decode(value, { stream: true }); 17 const lines = buffer.split('\n'); 18 buffer = lines.pop(); // Keep incomplete line 19 20 for (const line of lines) { 21 yield line; 22 } 23 } 24 } finally { 25 reader.releaseLock(); 26 } 27} 28 29// Usage 30const response = await fetch('/api/stream'); 31 32for await (const line of streamToLines(response.body)) { 33 console.log('Line:', line); 34}

Event Stream Processing#

1// Convert events to async iterator 2function eventIterator(element, eventName) { 3 const events = []; 4 let resolve = null; 5 let done = false; 6 7 element.addEventListener(eventName, (event) => { 8 if (resolve) { 9 resolve({ value: event, done: false }); 10 resolve = null; 11 } else { 12 events.push(event); 13 } 14 }); 15 16 return { 17 [Symbol.asyncIterator]() { 18 return this; 19 }, 20 21 async next() { 22 if (done) { 23 return { value: undefined, done: true }; 24 } 25 26 if (events.length > 0) { 27 return { value: events.shift(), done: false }; 28 } 29 30 return new Promise((r) => { 31 resolve = r; 32 }); 33 }, 34 35 return() { 36 done = true; 37 return { value: undefined, done: true }; 38 } 39 }; 40} 41 42// Usage 43const clicks = eventIterator(button, 'click'); 44 45for await (const event of clicks) { 46 console.log('Clicked at:', event.clientX, event.clientY); 47 48 if (shouldStop) { 49 break; // Calls return(), stops listening 50 } 51}

WebSocket Messages#

1// Async iterator for WebSocket 2function websocketIterator(url) { 3 const ws = new WebSocket(url); 4 const messages = []; 5 let resolve = null; 6 let reject = null; 7 let closed = false; 8 9 ws.onmessage = (event) => { 10 if (resolve) { 11 resolve({ value: event.data, done: false }); 12 resolve = null; 13 } else { 14 messages.push(event.data); 15 } 16 }; 17 18 ws.onerror = (error) => { 19 if (reject) reject(error); 20 }; 21 22 ws.onclose = () => { 23 closed = true; 24 if (resolve) { 25 resolve({ value: undefined, done: true }); 26 } 27 }; 28 29 return { 30 [Symbol.asyncIterator]() { 31 return this; 32 }, 33 34 async next() { 35 if (closed && messages.length === 0) { 36 return { value: undefined, done: true }; 37 } 38 39 if (messages.length > 0) { 40 return { value: messages.shift(), done: false }; 41 } 42 43 return new Promise((res, rej) => { 44 resolve = res; 45 reject = rej; 46 }); 47 }, 48 49 return() { 50 ws.close(); 51 return { value: undefined, done: true }; 52 } 53 }; 54} 55 56// Usage 57for await (const message of websocketIterator('wss://api.example.com')) { 58 const data = JSON.parse(message); 59 console.log('Received:', data); 60}

Transforming Async Iterables#

1// Map async iterable 2async function* asyncMap(iterable, fn) { 3 for await (const item of iterable) { 4 yield fn(item); 5 } 6} 7 8// Filter async iterable 9async function* asyncFilter(iterable, predicate) { 10 for await (const item of iterable) { 11 if (predicate(item)) { 12 yield item; 13 } 14 } 15} 16 17// Take first n items 18async function* asyncTake(iterable, n) { 19 let count = 0; 20 for await (const item of iterable) { 21 if (count >= n) break; 22 yield item; 23 count++; 24 } 25} 26 27// Batch items 28async function* asyncBatch(iterable, size) { 29 let batch = []; 30 31 for await (const item of iterable) { 32 batch.push(item); 33 34 if (batch.length >= size) { 35 yield batch; 36 batch = []; 37 } 38 } 39 40 if (batch.length > 0) { 41 yield batch; 42 } 43} 44 45// Pipeline 46const result = asyncTake( 47 asyncFilter( 48 asyncMap( 49 fetchPages('/api/items'), 50 item => ({ ...item, processed: true }) 51 ), 52 item => item.active 53 ), 54 100 55); 56 57for await (const item of result) { 58 console.log(item); 59}

Parallel Processing#

1// Process with limited concurrency 2async function* parallelMap(iterable, fn, concurrency = 3) { 3 const iterator = iterable[Symbol.asyncIterator](); 4 const pending = new Set(); 5 const results = []; 6 let done = false; 7 8 async function processNext() { 9 const { value, done: isDone } = await iterator.next(); 10 if (isDone) { 11 done = true; 12 return; 13 } 14 15 const promise = fn(value).then(result => { 16 pending.delete(promise); 17 results.push(result); 18 }); 19 20 pending.add(promise); 21 22 if (pending.size >= concurrency) { 23 await Promise.race(pending); 24 } 25 } 26 27 while (!done || pending.size > 0) { 28 while (!done && pending.size < concurrency) { 29 await processNext(); 30 } 31 32 if (results.length > 0) { 33 yield results.shift(); 34 } else if (pending.size > 0) { 35 await Promise.race(pending); 36 } 37 } 38} 39 40// Usage 41const urls = ['url1', 'url2', 'url3', /* ... */]; 42 43for await (const data of parallelMap(urls, fetchData, 5)) { 44 console.log('Fetched:', data); 45}

Error Handling#

1// Errors in async iterators 2async function* unreliableSource() { 3 yield 1; 4 yield 2; 5 throw new Error('Connection lost'); 6 yield 3; // Never reached 7} 8 9// Handle errors 10async function consumeWithErrorHandling() { 11 try { 12 for await (const value of unreliableSource()) { 13 console.log(value); 14 } 15 } catch (error) { 16 console.error('Error:', error.message); 17 } 18} 19 20// Retry pattern 21async function* withRetry(createIterator, maxRetries = 3) { 22 let retries = 0; 23 let iterator = createIterator(); 24 25 while (true) { 26 try { 27 const { value, done } = await iterator.next(); 28 if (done) break; 29 yield value; 30 retries = 0; // Reset on success 31 } catch (error) { 32 if (retries >= maxRetries) throw error; 33 retries++; 34 console.log(`Retry ${retries}/${maxRetries}`); 35 iterator = createIterator(); // Restart 36 } 37 } 38}

Collecting Results#

1// Collect all values 2async function asyncToArray(iterable) { 3 const results = []; 4 for await (const item of iterable) { 5 results.push(item); 6 } 7 return results; 8} 9 10// Reduce 11async function asyncReduce(iterable, fn, initial) { 12 let accumulator = initial; 13 for await (const item of iterable) { 14 accumulator = fn(accumulator, item); 15 } 16 return accumulator; 17} 18 19// Find first 20async function asyncFind(iterable, predicate) { 21 for await (const item of iterable) { 22 if (predicate(item)) return item; 23 } 24 return undefined; 25} 26 27// Usage 28const items = await asyncToArray(fetchPages('/api/items')); 29const sum = await asyncReduce(asyncRange(1, 100), (a, b) => a + b, 0); 30const firstActive = await asyncFind(users, u => u.active);

Best Practices#

Use Cases: ✓ Paginated API fetching ✓ Stream processing ✓ Real-time data (WebSockets) ✓ Large dataset iteration Patterns: ✓ Use async generators for simplicity ✓ Implement cleanup in return() ✓ Handle errors gracefully ✓ Consider backpressure Performance: ✓ Process items as they arrive ✓ Limit concurrency for parallel ops ✓ Use batching for efficiency ✓ Clean up resources Avoid: ✗ Loading everything into memory ✗ Ignoring error handling ✗ Forgetting to close resources ✗ Blocking the event loop

Conclusion#

Async iterators provide an elegant way to handle asynchronous data streams. Use async function* to create generators that yield values asynchronously, and for await...of to consume them. They're ideal for paginated APIs, real-time data, and processing large datasets. Combine with transformation utilities for powerful data pipelines. Always handle errors and clean up resources properly.

Share this article

Help spread the word about Bootspring