25
loading...
This website collects cookies to deliver better user experience
from httpx import Client
class WeatherClient:
def __init__(self) -> None:
self.client = Client()
def get_temp(self, lat: float, long: float) -> float:
self.client.get(...)
...
import httpx
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json={"temp": 43.21})
client = httpx.Client(transport=httpx.MockTransport(handler))
client
using a mock transport into our WeatherClient
?WeatherClient
accept an instance of httpx.Client
:from typing import Optional
from httpx import Client
class WeatherClient:
def __init__(self, client: Optional[Client] = None) -> None:
self.client = client or Client()
__init__
method.None
default so that we can detect when the caller did not include a Client
and so we can make a default one.httpx.Client
itself uses dependency injection for the transport
parameter? Neat huh!httpx
.di
is one of the latter frameworks, offering all of these features and more.WeatherClient
.import os
from dataclasses import dataclass, field
from xyz import App
def url_from_env() -> str:
return os.environ["WEATHER_URL"]
@dataclass
class AppConfig:
weather_url: str = field(default_factory=url_from_env)
class MyApp:
def __init__(self, weather_client: WeatherClient) -> None:
self.weather_client = weather_client
app = App(
weather_client=WeatherClient(
client=httpx.Client(
...
)
),
config=AppConfig(
...
)
)
di
:from di import Container, Dependant
container = Container()
app = container.execute_sync(
container.solve(Dependant(App))
)
Client
?client = Client(
transport=MockTransport(handler=handler)
)
container.bind(Dependant(lambda: client), Client)
app = container.execute_sync(
container.solve(Dependant(App))
)
Client
could be nested 10 layers of dependencies deep and the above code would not change at all.di
support a lot more functionality.from di import Container, Depends, Dependant
def func() -> object:
return object()
def dependant(
one: object = Depends(func),
two: object = Depends(func),
three: object = Depends(func, share=False)
) -> None:
assert one is two and two is not three
container = Container()
container.execute_sync(container.solve(Dependant(dependant)))
func
is called twice (by default in two separate threads), once for one
and two
, which share the same value, and once for three
.from typing import Generator
from di import Container, Depends, Dependant
def func() -> Generator[int, None, None]:
print("func startup")
yield 1
print("func shutdown")
def dependant(v: int = Depends(func, scope=1234)) -> int:
print("computing")
return v + 1
container = Container()
with container.enter_local_scope(1234):
print("enter scope")
res = container.execute_sync(
container.solve(Dependant(dependant))
)
print("exit scope")
assert res == 2
enter scope
func startup
computing
func shutdown
exit scope
from contextlib import contextmanager
with contextmanager(func)() as value:
dependant(v=value)
di
are arbitrary: any hashable value will do.di
will not care what you use. If "you" are a framework calling into user code, this gives you the freedom to define and manage your own scopes. For a web framework, these might be "app" and "request". For a CLI, there might be a single "call" scope. For a TUI, there could be an "app" and "event scope"container.bind
between each call).di
lets you save the entire computed dependency graph into a SavedDependency
and execute the same graph multiple times.container.solve
.# solving locks in binds, but not cached values or scopes
solved = container.solve(Dependant(dependency))
res1 = container.execute_sync(solved)
res2 = container.execute_sync(solved)
execute_sync
for execute_async
for async support:from di import Container, Dependant
async def dep() -> int:
return 1
async def main() -> None:
container = Container()
solved = container.solve(Dependant(dep))
res = await container.execute_async(solved)
assert res == 1
DependantProtocol
interface provides an abstract representation of a dependency specification.di
by overriding this API.di.Dependant
, but you don't have to.from di import Dependant
from di.types.providers import DependencyType
from di.types.scopes import Scope
class DefaultDependant(Dependant[DependencyType]):
def __init__(
self,
default: DependencyType,
scope: Scope,
shared: bool,
) -> None:
super().__init__(
call=lambda: default,
scope=scope,
share=share,
)
di
is to isolate the work of the container into distinct steps:di
allows you to provide your own executor, which only needs to take a list of groups of tasks (callable functions that take no arguments) and execute them in the specified order.import typing
ResultType = typing.TypeVar("ResultType")
Task = typing.Union[
typing.Callable[[], None],
typing.Callable[[], typing.Awaitable[None]]
]
class SyncExecutor(typing.Protocol):
def execute_sync(
self,
tasks: typing.List[typing.List[Task]],
get_result: typing.Callable[[], ResultType],
) -> ResultType:
...
class AsyncExecutor(typing.Protocol):
async def execute_async(
self,
tasks: typing.List[typing.List[Task]],
get_result: typing.Callable[[], ResultType],
) -> ResultType:
...
di.executors.DefaultExecutor
) supports both sync and async execution.class SimpleExecutor(SyncExecutor):
def execute_sync(
self,
tasks: typing.List[typing.List[Task]],
get_result: typing.Callable[[], ResultType],
) -> ResultType:
for group in tasks:
for task in group:
res = task()
if inspect.isawaitable(res):
raise TypeError
return get_result()
SyncExecutor
may be handed async functions as tasks.di
should be simple enough that it can be used by end users, it shines when integrated into a framework or library that fully embraces inversion of control.di
performs slightly faster than FastAPI (a couple percent per request).di
is able to execute dependencies concurrently, di
can be 10x faster than FastAPI.di
integration is doing (keeping track of stuff for OpenAPI, etc.).di
can help other libraries, frameworks or projects use inversion of control and dependency injection without having to go through the pain of writing their own dependency injection system.