Source code for asab.application

import logging
import asyncio
import argparse
import itertools
import os
import sys
import time
import signal
import platform
import random

try:
	import daemon
	import daemon.pidfile
	import lockfile
except ImportError:
	daemon = None

from .config import Config
from .abc.singleton import Singleton
from .log import Logging, _loop_exception_handler, LOG_NOTICE
from .task import TaskService
from .docker import running_in_docker

L = logging.getLogger(__name__)


[docs]class Application(metaclass=Singleton): Description = "This app is based on ASAB."
[docs] def __init__(self, args=None): try: # EX_OK code is not available on Windows self.ExitCode = os.EX_OK except AttributeError: self.ExitCode = 0 # Queue of Services to be initialized self.InitServicesQueue = [] # Queue of Modules to be initialized self.InitModulesQueue = [] # Parse command line self.Args = self.parse_arguments(args=args) # Load configuration # Obtain HostName self.HostName = platform.node() os.environ['HOSTNAME'] = self.HostName Config._load() if hasattr(self.Args, "daemonize") and self.Args.daemonize: self.daemonize() elif hasattr(self.Args, "kill") and self.Args.kill: self.daemon_kill() # Seed the random generator random.seed() # Obtain the event loop self.Loop = asyncio.get_event_loop() if self.Loop.is_closed(): self.Loop = asyncio.new_event_loop() asyncio.set_event_loop(self.Loop) self.LaunchTime = time.time() self.BaseTime = self.LaunchTime - self.Loop.time() self.Modules = [] self.Services = {} # Check if the application is running in Docker, # if so, add Docker service if running_in_docker(): from .docker import Module self.add_module(Module) self.DockerService = self.get_service("asab.DockerService") self.HostName = self.DockerService.load_hostname() os.environ['HOSTNAME'] = self.HostName Config._load() # Setup logging self.Logging = Logging(self) # Configure the event loop self.Loop.set_exception_handler(_loop_exception_handler) if Config["logging"].getboolean("verbose"): self.Loop.set_debug(True) # Adding a handler to listen to the interrupt event if platform.system() == "Windows": try: # Windows win32api import import win32api def handler(type): self.stop() return True win32api.SetConsoleCtrlHandler(handler, True) except ImportError as e: L.warning("win32api module could not be loaded, because '{}'".format( e )) else: # POSIX and other reasonable systems self.Loop.add_signal_handler(signal.SIGINT, self.stop) self.Loop.add_signal_handler(signal.SIGTERM, self.stop) self.Loop.add_signal_handler(signal.SIGHUP, self._hup) self._stop_event = asyncio.Event(loop=self.Loop) self._stop_event.clear() self._stop_counter = 0 from .pubsub import PubSub self.PubSub = PubSub(self) self.TaskService = TaskService(self) # Setup ASAB API if len(Config['asab:web']["listen"]) > 0: from asab.api import Module self.add_module(Module) L.info("Initializing ...")
[docs] def create_argument_parser( self, prog=None, usage=None, description=None, epilog=None, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True ): ''' This method can be overriden to adjust argparse configuration. Refer to the Python standard library to `argparse.ArgumentParser` for details of arguments. ''' parser = argparse.ArgumentParser( prog=prog, usage=usage, description=description if description is not None else self.Description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, prefix_chars=prefix_chars, fromfile_prefix_chars=fromfile_prefix_chars, argument_default=argument_default, conflict_handler=conflict_handler, add_help=add_help ) parser.add_argument('-c', '--config', help='specify a path to a configuration file') parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)') parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog') parser.add_argument('-l', '--log-file', help='specify a path to a log file') parser.add_argument('-w', '--web-api', help='activate Asab web API (default listening port is 0.0.0.0:8080)', const="0.0.0.0:8080", nargs="?") if daemon is not None: parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)') parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit') return parser
def parse_arguments(self, args=None): parser = self.create_argument_parser() args = parser.parse_args(args=args) if args.config is not None: Config._default_values['general']['config_file'] = args.config if args.verbose: Config._default_values['logging']['verbose'] = True if args.syslog: Config._default_values['logging:syslog']['enabled'] = True if args.log_file: Config._default_values['logging:file']['path'] = args.log_file if args.web_api: Config._default_values['asab:web']['listen'] = args.web_api return args def get_pidfile_path(self): pidfilepath = Config['general']['pidfile'] if pidfilepath == "": return None elif pidfilepath == "!": return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid') else: return pidfilepath def daemonize(self): if daemon is None: print("Install 'python-daemon' module to support daemonizing.", file=sys.stderr) sys.exit(1) pidfilepath = self.get_pidfile_path() if pidfilepath is not None: pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfilepath) working_dir = Config['general']['working_dir'] uid = Config['general']['uid'] if uid == "": uid = None gid = Config['general']['gid'] if gid == "": gid = None signal_map = { signal.SIGTTIN: None, signal.SIGTTOU: None, signal.SIGTSTP: None, } self.DaemonContext = daemon.DaemonContext( working_directory=working_dir, signal_map=signal_map, pidfile=pidfile, uid=uid, gid=gid, ) try: self.DaemonContext.open() except lockfile.AlreadyLocked as e: print("Cannot create a PID file '{}':".format(pidfilepath), e, file=sys.stderr) sys.exit(1) def daemon_kill(self): if daemon is None: print("Install 'python-daemon' module to support daemonising.", file=sys.stderr) sys.exit(1) pidfilepath = self.get_pidfile_path() if pidfilepath is None: sys.exit(0) try: pid = open(pidfilepath, "r").read() except FileNotFoundError: print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr) sys.exit(0) pid = int(pid) for sno in [signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGTERM]: try: os.kill(pid, sno) except ProcessLookupError: print("Process with pid '{}' not found.".format(pid), file=sys.stderr) sys.exit(0) for i in range(10): if not os.path.exists(pidfilepath): sys.exit(0) time.sleep(0.1) print("Daemon process (pid: {}) still running ...".format(pid), file=sys.stderr) print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr) sys.exit(1)
[docs] def run(self): # Comence init-time self.PubSub.publish("Application.init!") self.Loop.run_until_complete(asyncio.gather( self._init_time_governor(), self.initialize(), )) # Comence run-time and application main() function L.log(LOG_NOTICE, "is ready.") self._stop_event.clear() self.Loop.run_until_complete(asyncio.gather( self._run_time_governor(), self.main(), )) # Comence exit-time L.log(LOG_NOTICE, "is exiting ...") self.Loop.run_until_complete(asyncio.gather( self.finalize(), self._exit_time_governor(), )) # Python 3.5 lacks support for shutdown_asyncgens() if hasattr(self.Loop, "shutdown_asyncgens"): self.Loop.run_until_complete(self.Loop.shutdown_asyncgens()) self.Loop.close() return self.ExitCode
[docs] def stop(self, exit_code: int = None): if exit_code is not None: self.set_exit_code(exit_code) self._stop_event.set() self._stop_counter += 1 self.PubSub.publish("Application.stop!", self._stop_counter) if self._stop_counter >= 3: L.fatal("Emergency exit") for task in asyncio.all_tasks(): L.warning("Pending task during emergency exit: {}".format(task)) try: # EX_SOFTWARE code is not available on Windows return os._exit(os.EX_SOFTWARE) except AttributeError: return os._exit(0) elif self._stop_counter > 1: L.warning("{} tasks still active".format(len(asyncio.all_tasks())))
def _hup(self): self.Logging.rotate() self.PubSub.publish("Application.hup!") # Modules
[docs] def add_module(self, module_class): """ Load a new module. """ for module in self.Modules: if isinstance(module, module_class): # Already loaded and registered return module = module_class(self) self.Modules.append(module) # Enqueue module for initialization (happens in run phase) self.InitModulesQueue.append(module)
# Services
[docs] def get_service(self, service_name): """ Get a new service by its name. """ try: return self.Services[service_name] except KeyError: pass L.error("Cannot find service '{}' - not registered?".format(service_name)) raise KeyError("Cannot find service '{}'".format(service_name))
def _register_service(self, service): """ Register a new service using its name. """ if service.Name in self.Services: L.error("Service '{}' already registered (existing:{} new:{})".format( service.Name, self.Services[service.Name], service)) raise RuntimeError("Service {} already registered".format(service.Name)) self.Services[service.Name] = service # Enqueue service for initialization (happens in run phase) self.InitServicesQueue.append(service) # Lifecycle callback
[docs] async def initialize(self): pass
[docs] async def main(self): pass
[docs] async def finalize(self): pass
# Governors async def _init_time_governor(self): # Initialize all services that has been created during application construction await self._ensure_initialization() async def _run_time_governor(self): timeout = Config.getint('general', 'tick_period') self.PubSub.publish("Application.run!") # Wait for stop event & tick in meanwhile for cycle_no in itertools.count(1): await self._ensure_initialization() try: await asyncio.wait_for(self._stop_event.wait(), timeout=timeout) break except asyncio.TimeoutError: self.PubSub.publish("Application.tick!") if (cycle_no % 10) == 0: self.PubSub.publish("Application.tick/10!") if (cycle_no % 60) == 0: # Rebase a Loop time self.BaseTime = time.time() - self.Loop.time() self.PubSub.publish("Application.tick/60!") if (cycle_no % 300) == 0: self.PubSub.publish("Application.tick/300!") if (cycle_no % 600) == 0: self.PubSub.publish("Application.tick/600!") if (cycle_no % 1800) == 0: self.PubSub.publish("Application.tick/1800!") if (cycle_no % 3600) == 0: self.PubSub.publish("Application.tick/3600!") if (cycle_no % 43200) == 0: self.PubSub.publish("Application.tick/43200!") if (cycle_no % 86400) == 0: self.PubSub.publish("Application.tick/86400!") continue async def _exit_time_governor(self): self.PubSub.publish("Application.exit!") # Finalize services futures = set() for service in self.Services.values(): futures.add( service.finalize(self) ) while len(futures) > 0: done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) for fut in done: try: fut.result() except Exception: L.exception("Error during finalize call") # Finalize modules futures = set() for module in self.Modules: futures.add( module.finalize(self) ) while len(futures) > 0: done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) for fut in done: try: fut.result() except Exception: L.exception("Error during finalize call") # Wait for non-finalized tasks tasks_awaiting = 0 for i in range(3): try: ts = asyncio.all_tasks(self.Loop) except AttributeError: # Compatibility for Python 3.6- ts = asyncio.Task.all_tasks(self.Loop) tasks_awaiting = 0 for t in ts: if t.done(): continue tasks_awaiting += 1 if tasks_awaiting <= 1: # 2 is for _exit_time_governor and wait() break await asyncio.sleep(1) else: L.warning("Exiting but {} async task(s) are still waiting".format(tasks_awaiting)) async def _ensure_initialization(self): ''' This method ensures that any newly add module or registered service is initialized. It is called from: (1) init-time for modules&services added during application construction. (2) run-time for modules&services added during aplication lifecycle. ''' # Initialize modules while len(self.InitModulesQueue) > 0: module = self.InitModulesQueue.pop() try: await module.initialize(self) except Exception: L.exception("Error during module initialization") # Initialize services while len(self.InitServicesQueue) > 0: service = self.InitServicesQueue.pop() try: await service.initialize(self) except Exception: L.exception("Error during service initialization")
[docs] def set_exit_code(self, exit_code: int, force: bool = False): if (self.ExitCode < exit_code) or force: L.debug("Exit code set to {}".format(exit_code)) self.ExitCode = exit_code
# Time
[docs] def time(self): ''' Return UTC unix timestamp using a loop time (a fast way how to get a wall clock time). ''' return self.BaseTime + self.Loop.time()