Back to Blog
JavaScriptAsyncIteratorsGenerators

JavaScript Async Iterators

Master async iterators in JavaScript. From for-await-of to async generators to streaming data.

B
Bootspring Team
Engineering
August 17, 2020
8 min read

Async iterators enable iteration over asynchronous data sources. Here's how to use them.

Basic Async Iteration#

1// for-await-of loop 2async function processUrls(urls) { 3 for await (const response of fetchAll(urls)) { 4 console.log(await response.json()); 5 } 6} 7 8// Async iterable object 9const asyncIterable = { 10 [Symbol.asyncIterator]() { 11 let i = 0; 12 return { 13 async next() { 14 if (i < 3) { 15 await delay(100); 16 return { value: i++, done: false }; 17 } 18 return { done: true }; 19 }, 20 }; 21 }, 22}; 23 24// Using the async iterable 25async function main() { 26 for await (const num of asyncIterable) { 27 console.log(num); // 0, 1, 2 28 } 29}

Async Generators#

1// Basic async generator 2async function* asyncCounter(max) { 3 for (let i = 0; i < max; i++) { 4 await delay(100); 5 yield i; 6 } 7} 8 9// Usage 10async function main() { 11 for await (const num of asyncCounter(5)) { 12 console.log(num); // 0, 1, 2, 3, 4 13 } 14} 15 16// Fetch pages of data 17async function* fetchPages(baseUrl) { 18 let page = 1; 19 let hasMore = true; 20 21 while (hasMore) { 22 const response = await fetch(`${baseUrl}?page=${page}`); 23 const data = await response.json(); 24 25 yield data.items; 26 27 hasMore = data.hasNextPage; 28 page++; 29 } 30} 31 32// Usage 33async function getAllItems() { 34 const allItems = []; 35 36 for await (const items of fetchPages('/api/items')) { 37 allItems.push(...items); 38 } 39 40 return allItems; 41}

Streaming Data#

1// Stream file line by line 2async function* readLines(stream) { 3 const reader = stream.getReader(); 4 const decoder = new TextDecoder(); 5 let buffer = ''; 6 7 try { 8 while (true) { 9 const { done, value } = await reader.read(); 10 11 if (done) { 12 if (buffer) yield buffer; 13 break; 14 } 15 16 buffer += decoder.decode(value, { stream: true }); 17 const lines = buffer.split('\n'); 18 buffer = lines.pop() || ''; 19 20 for (const line of lines) { 21 yield line; 22 } 23 } 24 } finally { 25 reader.releaseLock(); 26 } 27} 28 29// Usage 30async function processFile() { 31 const response = await fetch('/large-file.txt'); 32 const stream = response.body; 33 34 for await (const line of readLines(stream)) { 35 processLine(line); 36 } 37} 38 39// Stream JSON objects 40async function* parseJSONStream(stream) { 41 for await (const line of readLines(stream)) { 42 if (line.trim()) { 43 yield JSON.parse(line); 44 } 45 } 46}

Event Streams#

1// Convert events to async iterable 2async function* eventIterator(emitter, eventName) { 3 const queue = []; 4 let resolve = null; 5 6 const handler = (data) => { 7 if (resolve) { 8 resolve({ value: data, done: false }); 9 resolve = null; 10 } else { 11 queue.push(data); 12 } 13 }; 14 15 emitter.on(eventName, handler); 16 17 try { 18 while (true) { 19 if (queue.length > 0) { 20 yield queue.shift(); 21 } else { 22 yield await new Promise((r) => (resolve = r)); 23 } 24 } 25 } finally { 26 emitter.off(eventName, handler); 27 } 28} 29 30// Usage 31async function handleMessages(socket) { 32 for await (const message of eventIterator(socket, 'message')) { 33 console.log('Received:', message); 34 } 35} 36 37// WebSocket async iterator 38async function* websocketIterator(url) { 39 const ws = new WebSocket(url); 40 const queue = []; 41 let resolve = null; 42 let done = false; 43 44 ws.onmessage = (event) => { 45 if (resolve) { 46 resolve(event.data); 47 resolve = null; 48 } else { 49 queue.push(event.data); 50 } 51 }; 52 53 ws.onclose = () => { 54 done = true; 55 if (resolve) resolve(null); 56 }; 57 58 await new Promise((r) => (ws.onopen = r)); 59 60 try { 61 while (!done) { 62 const data = queue.length > 0 63 ? queue.shift() 64 : await new Promise((r) => (resolve = r)); 65 66 if (data !== null) { 67 yield data; 68 } 69 } 70 } finally { 71 ws.close(); 72 } 73}

Transforming Async Iterables#

1// Map over async iterable 2async function* asyncMap(iterable, fn) { 3 for await (const item of iterable) { 4 yield fn(item); 5 } 6} 7 8// Filter async iterable 9async function* asyncFilter(iterable, predicate) { 10 for await (const item of iterable) { 11 if (predicate(item)) { 12 yield item; 13 } 14 } 15} 16 17// Take first n items 18async function* asyncTake(iterable, n) { 19 let count = 0; 20 for await (const item of iterable) { 21 if (count >= n) break; 22 yield item; 23 count++; 24 } 25} 26 27// Skip first n items 28async function* asyncSkip(iterable, n) { 29 let count = 0; 30 for await (const item of iterable) { 31 if (count >= n) { 32 yield item; 33 } 34 count++; 35 } 36} 37 38// Batch items 39async function* asyncBatch(iterable, size) { 40 let batch = []; 41 42 for await (const item of iterable) { 43 batch.push(item); 44 if (batch.length >= size) { 45 yield batch; 46 batch = []; 47 } 48 } 49 50 if (batch.length > 0) { 51 yield batch; 52 } 53} 54 55// Usage 56async function processUsers() { 57 const users = fetchAllUsers(); 58 59 const activeAdults = asyncFilter( 60 asyncMap(users, enrichUser), 61 user => user.active && user.age >= 18 62 ); 63 64 for await (const user of asyncTake(activeAdults, 100)) { 65 await sendEmail(user); 66 } 67}

Aggregation Functions#

1// Collect all items 2async function asyncToArray(iterable) { 3 const result = []; 4 for await (const item of iterable) { 5 result.push(item); 6 } 7 return result; 8} 9 10// Reduce async iterable 11async function asyncReduce(iterable, fn, initial) { 12 let accumulator = initial; 13 for await (const item of iterable) { 14 accumulator = fn(accumulator, item); 15 } 16 return accumulator; 17} 18 19// Find first matching 20async function asyncFind(iterable, predicate) { 21 for await (const item of iterable) { 22 if (predicate(item)) { 23 return item; 24 } 25 } 26 return undefined; 27} 28 29// Check if any match 30async function asyncSome(iterable, predicate) { 31 for await (const item of iterable) { 32 if (predicate(item)) { 33 return true; 34 } 35 } 36 return false; 37} 38 39// Check if all match 40async function asyncEvery(iterable, predicate) { 41 for await (const item of iterable) { 42 if (!predicate(item)) { 43 return false; 44 } 45 } 46 return true; 47} 48 49// Count items 50async function asyncCount(iterable) { 51 let count = 0; 52 for await (const _ of iterable) { 53 count++; 54 } 55 return count; 56}

Concurrent Processing#

1// Process with concurrency limit 2async function* asyncMapConcurrent(iterable, fn, concurrency = 3) { 3 const queue = []; 4 const results = []; 5 let index = 0; 6 7 for await (const item of iterable) { 8 const promise = fn(item, index++); 9 queue.push(promise); 10 11 if (queue.length >= concurrency) { 12 results.push(await Promise.race(queue)); 13 queue.splice(queue.indexOf(results[results.length - 1]), 1); 14 } 15 } 16 17 results.push(...(await Promise.all(queue))); 18 19 for (const result of results) { 20 yield result; 21 } 22} 23 24// Pool of workers 25async function* asyncPool(iterable, fn, poolSize = 5) { 26 const executing = new Set(); 27 28 for await (const item of iterable) { 29 const promise = fn(item).then((result) => { 30 executing.delete(promise); 31 return result; 32 }); 33 34 executing.add(promise); 35 36 if (executing.size >= poolSize) { 37 yield await Promise.race(executing); 38 } 39 } 40 41 while (executing.size > 0) { 42 yield await Promise.race(executing); 43 } 44}

Error Handling#

1// Generator with error handling 2async function* safeGenerator(iterable) { 3 for await (const item of iterable) { 4 try { 5 yield await processItem(item); 6 } catch (error) { 7 yield { error, item }; 8 } 9 } 10} 11 12// Retry on failure 13async function* withRetry(iterable, maxRetries = 3) { 14 for await (const item of iterable) { 15 let lastError; 16 17 for (let i = 0; i < maxRetries; i++) { 18 try { 19 yield await processItem(item); 20 break; 21 } catch (error) { 22 lastError = error; 23 await delay(Math.pow(2, i) * 1000); 24 } 25 } 26 27 if (lastError) { 28 throw lastError; 29 } 30 } 31} 32 33// Timeout for each item 34async function* withTimeout(iterable, ms) { 35 for await (const item of iterable) { 36 yield await Promise.race([ 37 processItem(item), 38 new Promise((_, reject) => 39 setTimeout(() => reject(new Error('Timeout')), ms) 40 ), 41 ]); 42 } 43}

Practical Examples#

1// Paginated API 2async function* fetchAllUsers(baseUrl) { 3 let cursor = null; 4 5 do { 6 const url = cursor 7 ? `${baseUrl}?cursor=${cursor}` 8 : baseUrl; 9 10 const response = await fetch(url); 11 const data = await response.json(); 12 13 yield* data.users; 14 15 cursor = data.nextCursor; 16 } while (cursor); 17} 18 19// Database cursor 20async function* queryInBatches(query, batchSize = 100) { 21 let offset = 0; 22 let hasMore = true; 23 24 while (hasMore) { 25 const results = await db.query(query, { limit: batchSize, offset }); 26 27 yield* results; 28 29 hasMore = results.length === batchSize; 30 offset += batchSize; 31 } 32} 33 34// Real-time updates 35async function* pollForUpdates(url, interval = 5000) { 36 let lastId = null; 37 38 while (true) { 39 const params = lastId ? `?since=${lastId}` : ''; 40 const response = await fetch(`${url}${params}`); 41 const updates = await response.json(); 42 43 for (const update of updates) { 44 yield update; 45 lastId = update.id; 46 } 47 48 await delay(interval); 49 } 50}

Best Practices#

Design: ✓ Use async generators for streams ✓ Implement proper cleanup in finally ✓ Handle backpressure appropriately ✓ Consider memory usage Error Handling: ✓ Wrap iteration in try-catch ✓ Implement retry logic when needed ✓ Add timeouts for safety ✓ Log errors appropriately Performance: ✓ Use batching for efficiency ✓ Implement concurrency limits ✓ Avoid buffering entire streams ✓ Clean up resources promptly Testing: ✓ Test with empty iterables ✓ Test error scenarios ✓ Test cancellation ✓ Mock async data sources

Conclusion#

Async iterators enable elegant handling of asynchronous data streams. Use async generators for producing values, for-await-of for consumption, and transformation functions for processing. Handle errors properly and implement cleanup in finally blocks.

Share this article

Help spread the word about Bootspring