Source code for acodex.types.turn
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Iterator
from dataclasses import dataclass, field
from typing import Generic, TypeAlias, TypeVar
from acodex.types.events import ThreadEvent, Usage
from acodex.types.items import ThreadItem
EventsT = TypeVar("EventsT")
T = TypeVar("T")
[docs]
@dataclass(frozen=True, slots=True)
class Turn(Generic[T]):
"""Completed turn.
This is the result returned by `run()`.
"""
items: list[ThreadItem]
final_response: str
usage: Usage | None
structured_response_factory: Callable[[], T] = field(repr=False, compare=False)
@property
def structured_response(self) -> T:
"""Return the structured response."""
return self.structured_response_factory()
@dataclass(frozen=True, slots=True)
class StreamedTurn(Generic[EventsT]):
"""The result of the `run_streamed` method."""
events: EventsT
RunResult: TypeAlias = Turn[T]
[docs]
@dataclass(frozen=True, slots=True)
class RunStreamedResult(StreamedTurn[Iterator[ThreadEvent]], Generic[T]):
"""The synchronous result of `run_streamed`."""
result_factory: Callable[[], RunResult[T]] = field(repr=False, compare=False)
@property
def result(self) -> RunResult[T]:
"""Return the reduced turn after full stream exhaustion."""
return self.result_factory()
[docs]
@dataclass(frozen=True, slots=True)
class AsyncRunStreamedResult(StreamedTurn[AsyncIterator[ThreadEvent]], Generic[T]):
"""The asynchronous result of `run_streamed`."""
result_factory: Callable[[], RunResult[T]] = field(repr=False, compare=False)
@property
def result(self) -> RunResult[T]:
"""Return the reduced turn after full stream exhaustion."""
return self.result_factory()