hojichar.utils.async_handlers
1from __future__ import annotations 2 3import asyncio 4import itertools 5from concurrent.futures import ThreadPoolExecutor 6from pathlib import Path 7from typing import AsyncGenerator, AsyncIterable, Iterable, TypeVar 8 9T = TypeVar("T") 10 11 12def handle_stream_as_async( 13 source_stream: Iterable[T] | AsyncIterable[T], 14 chunk_size: int = 1000, 15 executor: ThreadPoolExecutor | None = None, 16) -> AsyncGenerator[T, None]: 17 """ 18 Convert a synchronous iterable to an asynchronous generator 19 with a specified chunk size. 20 21 Args: 22 source_stream (Iterable[T]): The synchronous iterable to convert. 23 chunk_size (int): The number of items to yield at a time. 24 """ 25 if isinstance(source_stream, AsyncIterable): 26 return source_stream # type: ignore[return-value] 27 stream = iter(source_stream) 28 29 async def sync_to_async() -> AsyncGenerator[T, None]: 30 loop = asyncio.get_running_loop() 31 while True: 32 chunk = await loop.run_in_executor( 33 executor, lambda: list(itertools.islice(stream, chunk_size)) 34 ) 35 if not chunk: 36 break 37 for item in chunk: 38 yield item 39 40 return sync_to_async() 41 42 43async def write_stream_to_file( 44 stream: AsyncGenerator[str, None], 45 output_path: Path | str, 46 *, 47 chunk_size: int = 1000, 48 delimiter: str = "\n", 49) -> None: 50 """ 51 Write an asynchronous stream of strings to a file. 52 To lessen overhead with file I/O, it writes in chunks. 53 """ 54 loop = asyncio.get_running_loop() 55 with open(output_path, "w", encoding="utf-8") as f: 56 chunk = [] 57 async for line in stream: 58 chunk.append(line) 59 if len(chunk) >= chunk_size: 60 await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk]) 61 chunk = [] 62 if chunk: 63 await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk]) 64 chunk = [] 65 await loop.run_in_executor(None, f.flush)
def
handle_stream_as_async( source_stream: Union[Iterable[~T], AsyncIterable[~T]], chunk_size: int = 1000, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None) -> AsyncGenerator[~T, NoneType]:
13def handle_stream_as_async( 14 source_stream: Iterable[T] | AsyncIterable[T], 15 chunk_size: int = 1000, 16 executor: ThreadPoolExecutor | None = None, 17) -> AsyncGenerator[T, None]: 18 """ 19 Convert a synchronous iterable to an asynchronous generator 20 with a specified chunk size. 21 22 Args: 23 source_stream (Iterable[T]): The synchronous iterable to convert. 24 chunk_size (int): The number of items to yield at a time. 25 """ 26 if isinstance(source_stream, AsyncIterable): 27 return source_stream # type: ignore[return-value] 28 stream = iter(source_stream) 29 30 async def sync_to_async() -> AsyncGenerator[T, None]: 31 loop = asyncio.get_running_loop() 32 while True: 33 chunk = await loop.run_in_executor( 34 executor, lambda: list(itertools.islice(stream, chunk_size)) 35 ) 36 if not chunk: 37 break 38 for item in chunk: 39 yield item 40 41 return sync_to_async()
Convert a synchronous iterable to an asynchronous generator with a specified chunk size.
Args: source_stream (Iterable[T]): The synchronous iterable to convert. chunk_size (int): The number of items to yield at a time.
async def
write_stream_to_file( stream: AsyncGenerator[str, NoneType], output_path: pathlib.Path | str, *, chunk_size: int = 1000, delimiter: str = '\n') -> None:
44async def write_stream_to_file( 45 stream: AsyncGenerator[str, None], 46 output_path: Path | str, 47 *, 48 chunk_size: int = 1000, 49 delimiter: str = "\n", 50) -> None: 51 """ 52 Write an asynchronous stream of strings to a file. 53 To lessen overhead with file I/O, it writes in chunks. 54 """ 55 loop = asyncio.get_running_loop() 56 with open(output_path, "w", encoding="utf-8") as f: 57 chunk = [] 58 async for line in stream: 59 chunk.append(line) 60 if len(chunk) >= chunk_size: 61 await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk]) 62 chunk = [] 63 if chunk: 64 await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk]) 65 chunk = [] 66 await loop.run_in_executor(None, f.flush)
Write an asynchronous stream of strings to a file. To lessen overhead with file I/O, it writes in chunks.