25
loading...
This website collects cookies to deliver better user experience
correlation_id
to each log produced by the request handler. This would allow me to easily capture all the logs that happen within a request, even if requests are being processed concurrently:[480f4c31-3ceb-45be-afda-5676e59cc391 info] RegisterUserCommand handling started
[480f4c31-3ceb-45be-afda-5676e59cc391 info] Processing RegisterUserCommand command with params {'name': 'Alice', ...}
[249ae775-845d-456c-ab71-48776fcad938 info] RegisterUserCommand handling started
[249ae775-845d-456c-ab71-48776fcad938 info] Processing RegisterUserCommand command with params {'name': 'Bob', ...}
[249ae775-845d-456c-ab71-48776fcad938 info] RegisterUserCommand handling completed
[480f4c31-3ceb-45be-afda-5676e59cc391 warn] RegisterUserCommand handling failed
correlation_id
it is impossible to tell from the logs which command has failed.import uuid
class Logger:
"""Not a real logger, but it is enough for the purpose of this post"""
def __init__(self, correlation_id):
self.correlation_id = correlation_id
def info(self, *args, **kwargs):
print(f'[{self.correlation_id} info]', *args, **kwargs)
@app.post("/register")
def register(payload):
logger = Logger(uuid.uuid4())
logger.info("Registering user", payload)
...
return {"result": "OK"}
logger = Logger(uuid.uuid4())
to every route handler. We can solve this problem by injecting the logger into the handler function. How you do it depends on a framework, but let’s use a create_logger factory function for now.def create_logger():
logger = Logger(uuid.uuid4())
@app.post("/register")
def register(payload, logger = create_logger()):
logger.info("Registering user", payload)
return {"result": "OK"}
DatabaseEngine
, UserService
) to our handler:class DatabaseEngine:
def __init__(self, logger:Logger):
self.logger = logger
...
def connect(self):
...
def execute(self, query):
self.logger.info('Executing DB query...')
...
class UserService:
def __init__(self, logger:Logger, db_engine:DatabaseEngine):
self.logger = logger
self.db = db_engine
def register(self, payload):
self.logger.info('RegisterUserCommand handling started for', payload)
self.db.execute(...)
time.sleep(1)
self.logger.info('RegisterUserCommand handling completed for', payload)
def create_user_service():
logger = Logger(uuid.uuid4())
db_engine = DatabaseEngine(logger)
db_engine.connect()
user_service = UserService(logger=logger, db_engine=db_engine)
return user_service
@app.post("/register")
def register(payload, service:UserService = create_user_service()):
logger = service.logger # ugh!
logger.info("Registering user", payload)
result = service.register(payload)
return {"result": result}
UserService
and DatabaseEngine
, we must create a logger first. Moreover, since we instantiate a logger with every request, every other object that depends on a logger must be instantiated with every request as well. So having a global db_engine
object is not an option unless we give up on a per-request logger in favor of a global logger, which is something we don’t want to do. Instantiating a whole dependency graph for the application on every request just because of the per-request logger seems like overkill in terms of performance and resource management. If you think that “there must be a better way” you are right. Let’s explore our options.While it might be tempting to create Logger
instances on a per-connection basis, this is not a good idea because these instances are not garbage collected [source]
correlation_id
a global variable, then a logger could be a global object (or singleton if you prefer), referencing this global variable. As a result, both db_engine
and servive
could use the global logger, which means that the entire dependency graph could be constructed at once, on the application startup.correlation_id = None
class Logger:
def info(self, *args, **kwargs):
print(f'[{correlation_id} info]', *args, **kwargs)
# application startup
logger = Logger()
db_engine = DatabaseEngine(logger)
db_engine.connect()
def create_user_service():
user_service = UserService(logger=logger, db_engine=db_engine)
return user_service
@app.post("/register")
def register(payload, service:UserService = create_user_service()):
global correlation_id
correlation_id = uuid.uuid4() # we need to generate new uuid for each request
logger.info("Registering user", payload)
result = service.register(payload)
return {"result": result}
correlation_id
, while Thread-1 is still processing its own request, logs will be corrupted. Let's simulate this behavior:threads = [
threading.Thread(target=register, args=(payload,))
for payload in ['Alice', 'Bob']
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
[e2931dc9-ca83-4d79-9cfc-23aba03d264a info] Registering user Alice
[e2931dc9-ca83-4d79-9cfc-23aba03d264a info] RegisterUserCommand handling started for Alice
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] Registering user Bob
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] RegisterUserCommand handling started for Bob
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] Executing DB query...
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] Executing DB query...
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] RegisterUserCommand handling completed for Bob
[605e6790-387a-4fbe-ba08-874b57f8e0d6 info] RegisterUserCommand handling completed for Alice
correlation_id
in our case.class ContextLogger:
"""
This time we have a logger which is using threading.local()
"""
def __init__(self, thread_context):
self.thread_context = thread_context
def info(self, *args, **kwargs):
print(f'[{self.thread_context.correlation_id} info]', *args, **kwargs)
request_context = threading.local()
logger = ContextLogger(request_context)
...
@app.post("/register")
def register(payload, service:UserService = create_user_service()):
request_context.correlation_id = uuid.uuid4() # we store new uuid per thread
...
async
/await
calls, we need to use ContextVars instead of threading.local
. This is due to the event loop running in a single thread. Here is the updated version of the logger:class ConcurrentContextLogger:
def __init__(self, correlation_id: ContextVar):
self.correlation_id = correlation_id
def info(self, *args, **kwargs):
print(f'[{self.correlation_id.get()} info]', *args, **kwargs)
correlation_id: ContextVar[uuid.UUID] = ContextVar('correlation_id', default=uuid.UUID('00000000-0000-0000-0000-000000000000'))
logger = ConcurrentContextLogger(correlation_id)
...
@app.post("/register")
async def register(payload, service:UserService = create_user_service()):
correlation_id.set(uuid.uuid4()) # we store new uuid per coroutine
...
class ContextFilter(logging.Filter):
def filter(self, record):
record.correlation_id = correlation_id.get()
return True
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s %(correlation_id)s %(message)s'
)
context_filter = ContextFilter()
logger = logging.getLogger('app')
logger.addFilter(context_filter)
logger.info('Hello World!')