import asyncio
import datetime
import logging
import logging.handlers
import os
import pprint
import queue
import re
import socket
import sys
import time
import traceback
import urllib.parse
from .config import Config
from .timer import Timer
# Non-error/warning type of message that is visible without -v flag
LOG_NOTICE = 25
logging.addLevelName(LOG_NOTICE, "NOTICE")
[docs]class Logging(object):
def __init__(self, app):
self.RootLogger = logging.getLogger()
self.ConsoleHandler = None
self.FileHandler = None
self.SyslogHandler = None
if not self.RootLogger.hasHandlers():
# Add console logger if needed
if os.isatty(sys.stdin.fileno()) or os.environ.get('ASABFORCECONSOLE', '0') != '0':
self._configure_console_logging()
# Initialize file handler
file_path = Config["logging:file"]["path"]
if len(file_path) > 0:
# Ensure file path
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
self.FileHandler = logging.handlers.RotatingFileHandler(
file_path,
backupCount=Config.getint("logging:file", "backup_count"),
maxBytes=Config.getint("logging:file", "backup_max_bytes"),
)
self.FileHandler.setLevel(logging.DEBUG)
self.FileHandler.setFormatter(StructuredDataFormatter(
fmt=Config["logging:file"]["format"],
datefmt=Config["logging:file"]["datefmt"],
sd_id=Config["logging"]["sd_id"],
))
self.RootLogger.addHandler(self.FileHandler)
rotate_every = Config.get("logging:file", "rotate_every")
if rotate_every != '':
rotate_every = re.match(r"^([0-9]+)([dMHs])$", rotate_every)
if rotate_every is not None:
i, u = rotate_every.groups()
i = int(i)
if i <= 0:
self.RootLogger.error("Invalid 'rotate_every' configuration value.")
else:
if u == 'H':
i = i * 60 * 60
elif u == 'M':
i = i * 60
elif u == 'd':
i = i * 60 * 60 * 24
elif u == 's':
pass
# PubSub is not ready at this moment, we need to create timer in a future
async def schedule(app, interval):
self.LogRotatingTime = Timer(app, self._on_tick_rotate_check, autorestart=True)
self.LogRotatingTime.start(i)
asyncio.ensure_future(schedule(app, i), loop=app.Loop)
else:
self.RootLogger.error("Invalid 'rotate_every' configuration value.")
# Initialize syslog
if Config["logging:syslog"].getboolean("enabled"):
address = Config["logging:syslog"]["address"]
if address[:1] == '/':
self.SyslogHandler = AsyncIOHandler(app.Loop, socket.AF_UNIX, socket.SOCK_DGRAM, address)
else:
url = urllib.parse.urlparse(address)
if url.scheme == 'tcp':
self.SyslogHandler = AsyncIOHandler(app.Loop, socket.AF_INET, socket.SOCK_STREAM, (
url.hostname if url.hostname is not None else 'localhost',
url.port if url.port is not None else logging.handlers.SYSLOG_UDP_PORT
))
elif url.scheme == 'udp':
self.SyslogHandler = AsyncIOHandler(app.Loop, socket.AF_INET, socket.SOCK_DGRAM, (
url.hostname if url.hostname is not None else 'localhost',
url.port if url.port is not None else logging.handlers.SYSLOG_UDP_PORT
))
elif url.scheme == 'unix-connect':
self.SyslogHandler = AsyncIOHandler(app.Loop, socket.AF_UNIX, socket.SOCK_STREAM, url.path)
elif url.scheme == 'unix-sendto':
self.SyslogHandler = AsyncIOHandler(app.Loop, socket.AF_UNIX, socket.SOCK_DGRAM, url.path)
else:
self.RootLogger.warning("Invalid logging:syslog address '{}'".format(address))
address = None
if self.SyslogHandler is not None:
self.SyslogHandler.setLevel(logging.DEBUG)
format = Config["logging:syslog"]["format"]
if format == 'm':
self.SyslogHandler.setFormatter(MacOSXSyslogFormatter(sd_id=Config["logging"]["sd_id"]))
elif format == '5':
self.SyslogHandler.setFormatter(SyslogRFC5424Formatter(sd_id=Config["logging"]["sd_id"]))
else:
self.SyslogHandler.setFormatter(SyslogRFC3164Formatter(sd_id=Config["logging"]["sd_id"]))
self.RootLogger.addHandler(self.SyslogHandler)
# No logging is configured
if self.ConsoleHandler is None and self.FileHandler is None and self.SyslogHandler is None:
# Let's check if we run in Docker and if so, then log on stderr
from .docker import running_in_docker
if running_in_docker():
self._configure_console_logging()
else:
self.RootLogger.warning("Logging seems to be already configured. Proceed with caution.")
if Config["logging"].getboolean("verbose"):
self.RootLogger.setLevel(logging.DEBUG)
else:
self.RootLogger.setLevel(Config["logging"]["level"])
# Fine-grained log level configurations
levels = Config["logging"].get('levels')
for levelconf in levels.split('\n'):
levelconf = levelconf.strip()
if len(levelconf) == 0 or levelconf.startswith('#') or levelconf.startswith(';'):
continue
loggername, levelname = levelconf.split(' ', 1)
level = logging.getLevelName(levelname.upper())
logging.getLogger(loggername).setLevel(level)
[docs] def rotate(self):
if self.FileHandler is not None:
self.RootLogger.log(LOG_NOTICE, "Rotating logs")
self.FileHandler.doRollover()
async def _on_tick_rotate_check(self):
if self.FileHandler is not None:
if self.FileHandler.stream.tell() > 1000:
self.rotate()
def _configure_console_logging(self):
self.ConsoleHandler = logging.StreamHandler(stream=sys.stderr)
self.ConsoleHandler.setFormatter(StructuredDataFormatter(
fmt=Config["logging:console"]["format"],
datefmt=Config["logging:console"]["datefmt"],
sd_id=Config["logging"]["sd_id"],
use_color=True
))
self.ConsoleHandler.setLevel(logging.DEBUG)
self.RootLogger.addHandler(self.ConsoleHandler)
class _StructuredDataLogger(logging.Logger):
'''
This class extends a default python logger class, specifically by adding ``struct_data`` parameter to logging functions.
It means that you can use expressions such as ``logger.info("Hello world!", struct_data={'key':'value'})``.
'''
def _log(self, level, msg, args, exc_info=None, struct_data=None, extra=None, stack_info=False):
if struct_data is not None:
if extra is None:
extra = dict()
extra['_struct_data'] = struct_data
super()._log(level, msg, args, exc_info=exc_info, extra=extra, stack_info=stack_info)
logging.setLoggerClass(_StructuredDataLogger)
def _loop_exception_handler(loop, context):
'''
This is an logging exception handler for asyncio.
It's purpose is to nicely log any unhandled excpetion that arises in the asyncio tasks.
'''
exception = context.pop('exception', None)
message = context.pop('message', '')
if len(message) > 0:
message += '\n'
if len(context) > 0:
message += pprint.pformat(context)
if exception is not None:
ex_traceback = exception.__traceback__
tb_lines = [line.rstrip('\n') for line in traceback.format_exception(exception.__class__, exception, ex_traceback)]
message += '\n' + '\n'.join(tb_lines)
logging.getLogger().error(message)
[docs]class AsyncIOHandler(logging.Handler):
'''
A logging handler similar to a standard ``logging.handlers.SocketHandler`` that utilizes ``asyncio``.
It implements a queue for decoupling logging from a networking. The networking is fully event-driven via ``asyncio`` mechanisms.
'''
def __init__(self, loop, family, sock_type, address, facility=logging.handlers.SysLogHandler.LOG_LOCAL1):
logging.Handler.__init__(self)
self._family = family
self._type = sock_type
self._address = address
self._loop = loop
self._socket = None
self._reset()
self._queue = queue.Queue()
self._loop.call_soon(self._connect, self._loop)
def _reset(self):
self._write_ready = False
if self._socket is not None:
self._loop.remove_writer(self._socket)
self._loop.remove_reader(self._socket)
self._socket.close()
self._socket = None
def _connect(self, loop):
self._reset()
try:
self._socket = socket.socket(self._family, self._type)
self._socket.setblocking(0)
self._socket.connect(self._address)
except Exception as e:
print("Error when opening syslog connection to '{}'".format(self._address), e, file=sys.stderr)
return
self._loop.add_writer(self._socket, self._on_write)
self._loop.add_reader(self._socket, self._on_read)
def _on_write(self):
self._write_ready = True
self._loop.remove_writer(self._socket)
while not self._queue.empty():
# TODO: Handle eventual error in writing -> break the cycle and restart on write handler
msg = self._queue.get_nowait()
self._socket.sendall(msg)
def _on_read(self):
try:
_ = self._socket.recvfrom(1024)
# We receive "something" ... let's ignore that!
return
except Exception as e:
print("Error on the syslog socket '{}'".format(self._address), e, file=sys.stderr)
# Close a socket - there is no reason for reading or socket is actually closed
self._reset()
[docs] def emit(self, record):
'''
This is the entry point for log entries.
'''
try:
msg = self.format(record).encode('utf-8')
if self._write_ready:
try:
self._socket.sendall(msg)
except Exception as e:
print("Error when writing to syslog '{}'".format(self._address), e, file=sys.stderr)
self._enqueue(msg)
else:
self._enqueue(record)
except Exception as e:
print("Error when emit to syslog '{}'".format(self._address), e, file=sys.stderr)
self.handleError(record)
def _enqueue(self, record):
self._queue.put(record)
_RESET_SEQ = "\033[0m"
_COLOR_SEQ = "\033[1;%dm"
_BOLD_SEQ = "\033[1m"