hojichar.utils.async_handlers

 1from __future__ import annotations
 2
 3import asyncio
 4import itertools
 5from concurrent.futures import ThreadPoolExecutor
 6from pathlib import Path
 7from typing import AsyncGenerator, AsyncIterable, Iterable, TypeVar
 8
 9T = TypeVar("T")
10
11
12def handle_stream_as_async(
13    source_stream: Iterable[T] | AsyncIterable[T],
14    chunk_size: int = 1000,
15    executor: ThreadPoolExecutor | None = None,
16) -> AsyncGenerator[T, None]:
17    """
18    Convert a synchronous iterable to an asynchronous generator
19    with a specified chunk size.
20
21    Args:
22        source_stream (Iterable[T]): The synchronous iterable to convert.
23        chunk_size (int): The number of items to yield at a time.
24    """
25    if isinstance(source_stream, AsyncIterable):
26        return source_stream  # type: ignore[return-value]
27    stream = iter(source_stream)
28
29    async def sync_to_async() -> AsyncGenerator[T, None]:
30        loop = asyncio.get_running_loop()
31        while True:
32            chunk = await loop.run_in_executor(
33                executor, lambda: list(itertools.islice(stream, chunk_size))
34            )
35            if not chunk:
36                break
37            for item in chunk:
38                yield item
39
40    return sync_to_async()
41
42
43async def write_stream_to_file(
44    stream: AsyncGenerator[str, None],
45    output_path: Path | str,
46    *,
47    chunk_size: int = 1000,
48    delimiter: str = "\n",
49) -> None:
50    """
51    Write an asynchronous stream of strings to a file.
52    To lessen overhead with file I/O, it writes in chunks.
53    """
54    loop = asyncio.get_running_loop()
55    with open(output_path, "w", encoding="utf-8") as f:
56        chunk = []
57        async for line in stream:
58            chunk.append(line)
59            if len(chunk) >= chunk_size:
60                await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk])
61                chunk = []
62        if chunk:
63            await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk])
64            chunk = []
65        await loop.run_in_executor(None, f.flush)
def handle_stream_as_async( source_stream: Union[Iterable[~T], AsyncIterable[~T]], chunk_size: int = 1000, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None) -> AsyncGenerator[~T, NoneType]:
13def handle_stream_as_async(
14    source_stream: Iterable[T] | AsyncIterable[T],
15    chunk_size: int = 1000,
16    executor: ThreadPoolExecutor | None = None,
17) -> AsyncGenerator[T, None]:
18    """
19    Convert a synchronous iterable to an asynchronous generator
20    with a specified chunk size.
21
22    Args:
23        source_stream (Iterable[T]): The synchronous iterable to convert.
24        chunk_size (int): The number of items to yield at a time.
25    """
26    if isinstance(source_stream, AsyncIterable):
27        return source_stream  # type: ignore[return-value]
28    stream = iter(source_stream)
29
30    async def sync_to_async() -> AsyncGenerator[T, None]:
31        loop = asyncio.get_running_loop()
32        while True:
33            chunk = await loop.run_in_executor(
34                executor, lambda: list(itertools.islice(stream, chunk_size))
35            )
36            if not chunk:
37                break
38            for item in chunk:
39                yield item
40
41    return sync_to_async()

Convert a synchronous iterable to an asynchronous generator with a specified chunk size.

Args: source_stream (Iterable[T]): The synchronous iterable to convert. chunk_size (int): The number of items to yield at a time.

async def write_stream_to_file( stream: AsyncGenerator[str, NoneType], output_path: pathlib.Path | str, *, chunk_size: int = 1000, delimiter: str = '\n') -> None:
44async def write_stream_to_file(
45    stream: AsyncGenerator[str, None],
46    output_path: Path | str,
47    *,
48    chunk_size: int = 1000,
49    delimiter: str = "\n",
50) -> None:
51    """
52    Write an asynchronous stream of strings to a file.
53    To lessen overhead with file I/O, it writes in chunks.
54    """
55    loop = asyncio.get_running_loop()
56    with open(output_path, "w", encoding="utf-8") as f:
57        chunk = []
58        async for line in stream:
59            chunk.append(line)
60            if len(chunk) >= chunk_size:
61                await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk])
62                chunk = []
63        if chunk:
64            await loop.run_in_executor(None, f.writelines, [s + delimiter for s in chunk])
65            chunk = []
66        await loop.run_in_executor(None, f.flush)

Write an asynchronous stream of strings to a file. To lessen overhead with file I/O, it writes in chunks.