import logging
import asyncio
import argparse
import itertools
import os
import sys
import time
import signal
import platform
import random
try:
import daemon
import daemon.pidfile
import lockfile
except ImportError:
daemon = None
from .config import Config
from .abc.singleton import Singleton
from .log import Logging, _loop_exception_handler, LOG_NOTICE
from .task import TaskService
from .docker import running_in_docker
L = logging.getLogger(__name__)
[docs]class Application(metaclass=Singleton):
Description = "This app is based on ASAB."
[docs] def __init__(self, args=None):
try:
# EX_OK code is not available on Windows
self.ExitCode = os.EX_OK
except AttributeError:
self.ExitCode = 0
# Queue of Services to be initialized
self.InitServicesQueue = []
# Queue of Modules to be initialized
self.InitModulesQueue = []
# Parse command line
self.Args = self.parse_arguments(args=args)
# Load configuration
# Obtain HostName
self.HostName = platform.node()
os.environ['HOSTNAME'] = self.HostName
Config._load()
if hasattr(self.Args, "daemonize") and self.Args.daemonize:
self.daemonize()
elif hasattr(self.Args, "kill") and self.Args.kill:
self.daemon_kill()
# Seed the random generator
random.seed()
# Obtain the event loop
self.Loop = asyncio.get_event_loop()
if self.Loop.is_closed():
self.Loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.Loop)
self.LaunchTime = time.time()
self.BaseTime = self.LaunchTime - self.Loop.time()
self.Modules = []
self.Services = {}
# Check if the application is running in Docker,
# if so, add Docker service
if running_in_docker():
from .docker import Module
self.add_module(Module)
self.DockerService = self.get_service("asab.DockerService")
self.HostName = self.DockerService.load_hostname()
os.environ['HOSTNAME'] = self.HostName
Config._load()
# Setup logging
self.Logging = Logging(self)
# Configure the event loop
self.Loop.set_exception_handler(_loop_exception_handler)
if Config["logging"].getboolean("verbose"):
self.Loop.set_debug(True)
# Adding a handler to listen to the interrupt event
if platform.system() == "Windows":
try:
# Windows win32api import
import win32api
def handler(type):
self.stop()
return True
win32api.SetConsoleCtrlHandler(handler, True)
except ImportError as e:
L.warning("win32api module could not be loaded, because '{}'".format(
e
))
else:
# POSIX and other reasonable systems
self.Loop.add_signal_handler(signal.SIGINT, self.stop)
self.Loop.add_signal_handler(signal.SIGTERM, self.stop)
self.Loop.add_signal_handler(signal.SIGHUP, self._hup)
self._stop_event = asyncio.Event(loop=self.Loop)
self._stop_event.clear()
self._stop_counter = 0
from .pubsub import PubSub
self.PubSub = PubSub(self)
self.TaskService = TaskService(self)
# Setup ASAB API
if len(Config['asab:web']["listen"]) > 0:
from asab.api import Module
self.add_module(Module)
L.info("Initializing ...")
[docs] def create_argument_parser(
self,
prog=None,
usage=None,
description=None,
epilog=None,
prefix_chars='-',
fromfile_prefix_chars=None,
argument_default=None,
conflict_handler='error',
add_help=True
):
'''
This method can be overriden to adjust argparse configuration.
Refer to the Python standard library to `argparse.ArgumentParser` for details of arguments.
'''
parser = argparse.ArgumentParser(
prog=prog,
usage=usage,
description=description if description is not None else self.Description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler,
add_help=add_help
)
parser.add_argument('-c', '--config', help='specify a path to a configuration file')
parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)')
parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog')
parser.add_argument('-l', '--log-file', help='specify a path to a log file')
parser.add_argument('-w', '--web-api', help='activate Asab web API (default listening port is 0.0.0.0:8080)', const="0.0.0.0:8080", nargs="?")
if daemon is not None:
parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)')
parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit')
return parser
def parse_arguments(self, args=None):
parser = self.create_argument_parser()
args = parser.parse_args(args=args)
if args.config is not None:
Config._default_values['general']['config_file'] = args.config
if args.verbose:
Config._default_values['logging']['verbose'] = True
if args.syslog:
Config._default_values['logging:syslog']['enabled'] = True
if args.log_file:
Config._default_values['logging:file']['path'] = args.log_file
if args.web_api:
Config._default_values['asab:web']['listen'] = args.web_api
return args
def get_pidfile_path(self):
pidfilepath = Config['general']['pidfile']
if pidfilepath == "":
return None
elif pidfilepath == "!":
return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid')
else:
return pidfilepath
def daemonize(self):
if daemon is None:
print("Install 'python-daemon' module to support daemonizing.", file=sys.stderr)
sys.exit(1)
pidfilepath = self.get_pidfile_path()
if pidfilepath is not None:
pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfilepath)
working_dir = Config['general']['working_dir']
uid = Config['general']['uid']
if uid == "":
uid = None
gid = Config['general']['gid']
if gid == "":
gid = None
signal_map = {
signal.SIGTTIN: None,
signal.SIGTTOU: None,
signal.SIGTSTP: None,
}
self.DaemonContext = daemon.DaemonContext(
working_directory=working_dir,
signal_map=signal_map,
pidfile=pidfile,
uid=uid,
gid=gid,
)
try:
self.DaemonContext.open()
except lockfile.AlreadyLocked as e:
print("Cannot create a PID file '{}':".format(pidfilepath), e, file=sys.stderr)
sys.exit(1)
def daemon_kill(self):
if daemon is None:
print("Install 'python-daemon' module to support daemonising.", file=sys.stderr)
sys.exit(1)
pidfilepath = self.get_pidfile_path()
if pidfilepath is None:
sys.exit(0)
try:
pid = open(pidfilepath, "r").read()
except FileNotFoundError:
print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
sys.exit(0)
pid = int(pid)
for sno in [signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGTERM]:
try:
os.kill(pid, sno)
except ProcessLookupError:
print("Process with pid '{}' not found.".format(pid), file=sys.stderr)
sys.exit(0)
for i in range(10):
if not os.path.exists(pidfilepath):
sys.exit(0)
time.sleep(0.1)
print("Daemon process (pid: {}) still running ...".format(pid), file=sys.stderr)
print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
sys.exit(1)
[docs] def run(self):
# Comence init-time
self.PubSub.publish("Application.init!")
self.Loop.run_until_complete(asyncio.gather(
self._init_time_governor(),
self.initialize(),
))
# Comence run-time and application main() function
L.log(LOG_NOTICE, "is ready.")
self._stop_event.clear()
self.Loop.run_until_complete(asyncio.gather(
self._run_time_governor(),
self.main(),
))
# Comence exit-time
L.log(LOG_NOTICE, "is exiting ...")
self.Loop.run_until_complete(asyncio.gather(
self.finalize(),
self._exit_time_governor(),
))
# Python 3.5 lacks support for shutdown_asyncgens()
if hasattr(self.Loop, "shutdown_asyncgens"):
self.Loop.run_until_complete(self.Loop.shutdown_asyncgens())
self.Loop.close()
return self.ExitCode
[docs] def stop(self, exit_code: int = None):
if exit_code is not None:
self.set_exit_code(exit_code)
self._stop_event.set()
self._stop_counter += 1
self.PubSub.publish("Application.stop!", self._stop_counter)
if self._stop_counter >= 3:
L.fatal("Emergency exit")
for task in asyncio.all_tasks():
L.warning("Pending task during emergency exit: {}".format(task))
try:
# EX_SOFTWARE code is not available on Windows
return os._exit(os.EX_SOFTWARE)
except AttributeError:
return os._exit(0)
elif self._stop_counter > 1:
L.warning("{} tasks still active".format(len(asyncio.all_tasks())))
def _hup(self):
self.Logging.rotate()
self.PubSub.publish("Application.hup!")
# Modules
[docs] def add_module(self, module_class):
""" Load a new module. """
for module in self.Modules:
if isinstance(module, module_class):
# Already loaded and registered
return
module = module_class(self)
self.Modules.append(module)
# Enqueue module for initialization (happens in run phase)
self.InitModulesQueue.append(module)
# Services
[docs] def get_service(self, service_name):
""" Get a new service by its name. """
try:
return self.Services[service_name]
except KeyError:
pass
L.error("Cannot find service '{}' - not registered?".format(service_name))
raise KeyError("Cannot find service '{}'".format(service_name))
def _register_service(self, service):
""" Register a new service using its name. """
if service.Name in self.Services:
L.error("Service '{}' already registered (existing:{} new:{})".format(
service.Name, self.Services[service.Name], service))
raise RuntimeError("Service {} already registered".format(service.Name))
self.Services[service.Name] = service
# Enqueue service for initialization (happens in run phase)
self.InitServicesQueue.append(service)
# Lifecycle callback
[docs] async def initialize(self):
pass
[docs] async def main(self):
pass
[docs] async def finalize(self):
pass
# Governors
async def _init_time_governor(self):
# Initialize all services that has been created during application construction
await self._ensure_initialization()
async def _run_time_governor(self):
timeout = Config.getint('general', 'tick_period')
self.PubSub.publish("Application.run!")
# Wait for stop event & tick in meanwhile
for cycle_no in itertools.count(1):
await self._ensure_initialization()
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=timeout)
break
except asyncio.TimeoutError:
self.PubSub.publish("Application.tick!")
if (cycle_no % 10) == 0:
self.PubSub.publish("Application.tick/10!")
if (cycle_no % 60) == 0:
# Rebase a Loop time
self.BaseTime = time.time() - self.Loop.time()
self.PubSub.publish("Application.tick/60!")
if (cycle_no % 300) == 0:
self.PubSub.publish("Application.tick/300!")
if (cycle_no % 600) == 0:
self.PubSub.publish("Application.tick/600!")
if (cycle_no % 1800) == 0:
self.PubSub.publish("Application.tick/1800!")
if (cycle_no % 3600) == 0:
self.PubSub.publish("Application.tick/3600!")
if (cycle_no % 43200) == 0:
self.PubSub.publish("Application.tick/43200!")
if (cycle_no % 86400) == 0:
self.PubSub.publish("Application.tick/86400!")
continue
async def _exit_time_governor(self):
self.PubSub.publish("Application.exit!")
# Finalize services
futures = set()
for service in self.Services.values():
futures.add(
service.finalize(self)
)
while len(futures) > 0:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
fut.result()
except Exception:
L.exception("Error during finalize call")
# Finalize modules
futures = set()
for module in self.Modules:
futures.add(
module.finalize(self)
)
while len(futures) > 0:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
fut.result()
except Exception:
L.exception("Error during finalize call")
# Wait for non-finalized tasks
tasks_awaiting = 0
for i in range(3):
try:
ts = asyncio.all_tasks(self.Loop)
except AttributeError:
# Compatibility for Python 3.6-
ts = asyncio.Task.all_tasks(self.Loop)
tasks_awaiting = 0
for t in ts:
if t.done():
continue
tasks_awaiting += 1
if tasks_awaiting <= 1:
# 2 is for _exit_time_governor and wait()
break
await asyncio.sleep(1)
else:
L.warning("Exiting but {} async task(s) are still waiting".format(tasks_awaiting))
async def _ensure_initialization(self):
'''
This method ensures that any newly add module or registered service is initialized.
It is called from:
(1) init-time for modules&services added during application construction.
(2) run-time for modules&services added during aplication lifecycle.
'''
# Initialize modules
while len(self.InitModulesQueue) > 0:
module = self.InitModulesQueue.pop()
try:
await module.initialize(self)
except Exception:
L.exception("Error during module initialization")
# Initialize services
while len(self.InitServicesQueue) > 0:
service = self.InitServicesQueue.pop()
try:
await service.initialize(self)
except Exception:
L.exception("Error during service initialization")
[docs] def set_exit_code(self, exit_code: int, force: bool = False):
if (self.ExitCode < exit_code) or force:
L.debug("Exit code set to {}".format(exit_code))
self.ExitCode = exit_code
# Time
[docs] def time(self):
'''
Return UTC unix timestamp using a loop time (a fast way how to get a wall clock time).
'''
return self.BaseTime + self.Loop.time()