27
loading...
This website collects cookies to deliver better user experience
from abc import ABC, abstractmethod
class EventPublisher(ABC):
@abstractmethod
def publish(self, events):
pass
EventPublisher
we make a wrapper for Nameko EventDispatcher
class.# domain/events.py
from datetime import datetime
from typing import Dict
from dataclasses import dataclass
@dataclass(frozen=True)
class DomainEvent:
@property
def as_dict(self) -> Dict:
serialized = asdict(self)
serialized["name"] = self.name
return serialized
@dataclass(frozen=True)
class RegisteredEntryEvent(DomainEvent):
email: str
name: str = 'RegisteredEntryEvent'
timestamp: datetime = datetime.now()
# infrastructure/event_publisher.py
from typing import List
from nameko.events import EventDispatcher
from application.event_publisher import EventPublisher
from domain.event import DomainEvent
class NamekoEventPublisher(EventPublisher):
def __init__(self, dispatcher: EventDispatcher):
self._dispatcher = dispatcher
def publish(self, events: List[DomainEvent]) -> None:
for event in events:
self._dispatcher(event.name, event.as_dict)
EventPublisher
interface (not implementation) is injected into the service.# application/service.py
from application.event_publisher import EventPublisher
from domain.event import RegisteredEntryEvent
class RegistrationService:
def __init__(self, event_publisher: EventPublisher):
self._event_publisher = event_publisher
def register_entry(self, email: str) -> None:
event = RegisteredEntryEvent(email=email)
self._event_publisher.publish(event)
RegistrationEntryService
port. It handles HTTP request data and execute the service method.import json
from nameko.events import EventDispatcher
from nameko.web.handlers import http
from werkzeug.wrappers import Request, Response
class NamekoRegistrationService:
name = "nameko_registration_service"
dispatcher = EventDispatcher()
@http("POST", "/register")
def register_entry(self, request: Request) -> Response:
request_params = json.loads(request.data)
email = request_params["email"]
service = RegistrationService(
event_dispatcher=NamekoEventPublisher(self.dispatcher),
)
service.register_entry(email)
return Response(f"Registered entry for {email=}")