Source code for asab.application

import logging
import asyncio
import argparse
import itertools
import os
import sys
import time
import signal
import platform
import random

try:
	import daemon
	import daemon.pidfile
except ImportError:
	daemon = None

from .config import Config
from .abc.singleton import Singleton
from .log import Logging, _loop_exception_handler

# Importing the Win API library
if platform.system() == "Windows":
	try:
		import win32api
	except ModuleNotFoundError:
		win32api = None
else:
	win32api = None


L = logging.getLogger(__name__)


[docs]class Application(metaclass=Singleton): Description = "This app is based on ASAB."
[docs] def __init__(self): try: # EX_OK code is not available on Windows self.ExitCode = os.EX_OK except AttributeError: self.ExitCode = 0 # Parse command line args = self.parse_arguments() # Load configuration Config._load() if hasattr(args, "daemonize") and args.daemonize: self.daemonize() elif hasattr(args, "kill") and args.kill: self.daemon_kill() # Seed the random generator random.seed() # Obtain the event loop self.Loop = asyncio.get_event_loop() # Setup logging self.Logging = Logging(self) # Configure the event loop self.Loop.set_exception_handler(_loop_exception_handler) if Config["logging"].getboolean("verbose"): self.Loop.set_debug(True) try: # Signals are not available on Windows self.Loop.add_signal_handler(signal.SIGINT, self.stop) except NotImplementedError: # Checking if the program runs on Windows if any(platform.win32_ver()): if win32api is not None: callback = self.stop # Adding a handler to listen to the interrupt event def handler(type): callback() return True win32api.SetConsoleCtrlHandler(handler, True) try: self.Loop.add_signal_handler(signal.SIGTERM, self.stop) except NotImplementedError: pass try: self.Loop.add_signal_handler(signal.SIGHUP, self._hup) except NotImplementedError: pass self._stop_event = asyncio.Event(loop = self.Loop) self._stop_event.clear() self._stop_counter = 0 from .pubsub import PubSub self.PubSub = PubSub(self) self.Modules = [] self.Services = {} # Comence init-time governor L.info("Initializing ...") finished_tasks, pending_tasks = self.Loop.run_until_complete(asyncio.wait( [ self.initialize(), self._init_time_governor(asyncio.Future()), ], return_when = asyncio.FIRST_EXCEPTION )) for task in finished_tasks: # This one also raises exceptions from futures, which is perfectly ok task.result() if len(pending_tasks) > 0: raise RuntimeError("Failed to fully initialize. Here are pending tasks: {}".format(pending_tasks))
def create_argument_parser(self): ''' This method can be overriden to adjust argparse configuration ''' parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=self.Description, ) parser.add_argument('-c', '--config', help='specify a path to a configuration file') parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)') parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog') parser.add_argument('-l', '--log-file', help='specify a path to a log file') if daemon is not None: parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)') parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit') return parser def parse_arguments(self): parser = self.create_argument_parser() args = parser.parse_args() if args.config is not None: Config._default_values['general']['config_file'] = args.config if args.verbose: Config._default_values['logging']['verbose'] = True if args.syslog: Config._default_values['logging:syslog']['enabled'] = True if args.log_file: Config._default_values['logging:file']['path'] = args.log_file return args def get_pidfile_path(self): pidfilepath = Config['general']['pidfile'] if pidfilepath == "": return None elif pidfilepath == "!": return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid') else: return pidfilepath def daemonize(self): if daemon is None: print("Install 'python-daemon' module to support daemonising.", file=sys.stderr) sys.exit(1) pidfilepath = self.get_pidfile_path() if pidfilepath is not None: pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfilepath) working_dir = Config['general']['working_dir'] uid = Config['general']['uid'] if uid == "": uid = None gid = Config['general']['gid'] if gid == "": gid = None signal_map={ signal.SIGTTIN: None, signal.SIGTTOU: None, signal.SIGTSTP: None, } self.DaemonContext = daemon.DaemonContext( working_directory=working_dir, signal_map=signal_map, pidfile=pidfile, uid=uid, gid=gid, ) try: self.DaemonContext.open() except lockfile.AlreadyLocked as e: print("Cannot create a PID file '{}':".format(pidfilepath), e, file=sys.stderr) sys.exit(1) def daemon_kill(self): if daemon is None: print("Install 'python-daemon' module to support daemonising.", file=sys.stderr) sys.exit(1) pidfilepath = self.get_pidfile_path() if pidfilepath is None: sys.exit(0) try: pid = open(pidfilepath, "r").read() except FileNotFoundError: print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr) sys.exit(0) pid = int(pid) for sno in [signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGTERM]: try: os.kill(pid, sno) except ProcessLookupError: print("Process with pid '{}' not found.".format(pid), file=sys.stderr) sys.exit(0) for i in range(10): if not os.path.exists(pidfilepath): sys.exit(0) time.sleep(0.1) print("Daemon process (pid: {}) still running ...".format(pid), file=sys.stderr) print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr) sys.exit(1)
[docs] def run(self): # Comence run-time and application main() function L.info("Running ...") self._stop_event.clear() finished_tasks, pending_tasks = self.Loop.run_until_complete(asyncio.wait( [ self.main(), self._run_time_governor(asyncio.Future()), ], return_when = asyncio.FIRST_EXCEPTION )) for task in finished_tasks: try: task.result() except BaseException: L.exception("Exception in {}".format(task)) #TODO: Process pending_tasks tasks from above # Comence exit-time L.info("Exiting ...") finished_tasks, pending_tasks = self.Loop.run_until_complete(asyncio.wait( [ self.finalize(), self._exit_time_governor(asyncio.Future()), ], return_when = asyncio.FIRST_EXCEPTION )) for task in finished_tasks: try: task.result() except BaseException: L.exception("Exception in {}".format(task)) #TODO: Process pending_tasks tasks from above (should be none) # Python 3.5 lacks support for shutdown_asyncgens() if hasattr(self.Loop, "shutdown_asyncgens"): self.Loop.run_until_complete(self.Loop.shutdown_asyncgens()) self.Loop.close() return self.ExitCode
[docs] def stop(self, exit_code:int=None): if exit_code is not None: self.set_exit_code(exit_code) self._stop_event.set() self._stop_counter += 1 self.PubSub.publish("Application.stop!", self._stop_counter) if self._stop_counter >= 3: L.fatal("Emergency exit") try: # EX_SOFTWARE code is not available on Windows return os._exit(os.EX_SOFTWARE) except AttributeError: return os._exit(0)
def _hup(self): self.Logging.rotate() self.PubSub.publish("Application.hup!") # Modules
[docs] def add_module(self, module_class): """ Load a new module. """ for module in self.Modules: if isinstance(module, module_class): # Already loaded and registered return module = module_class(self) self.Modules.append(module) asyncio.ensure_future(module.initialize(self), loop=self.Loop)
# Services
[docs] def get_service(self, service_name): """ Get a new service by its name. """ try: return self.Services[service_name] except KeyError: pass L.error("Cannot find service '{}' - not registered?".format(service_name)) raise KeyError("Cannot find service '{}'".format(service_name))
def _register_service(self, service): """ Register a new service using its name. """ if service.Name in self.Services: L.error("Service '{}' already registered (existing:{} new:{})" .format(service.Name, self.Services[service.Name], service)) raise RuntimeError("Service {} already registered".format(service.Name)) self.Services[service.Name] = service asyncio.ensure_future(service.initialize(self), loop=self.Loop) # Lifecycle callback
[docs] async def initialize(self): pass
[docs] async def main(self): pass
[docs] async def finalize(self): pass
# Governors async def _init_time_governor(self, future): self.PubSub.publish("Application.init!") future.set_result("initialize") async def _run_time_governor(self, future): timeout = Config.getint('general', 'tick_period') try: self.PubSub.publish("Application.run!") # Wait for stop event & tick in meanwhile for cycle_no in itertools.count(1): try: await asyncio.wait_for(self._stop_event.wait(), timeout=timeout) break except asyncio.TimeoutError: self.PubSub.publish("Application.tick!") if (cycle_no % 10) == 0: self.PubSub.publish("Application.tick/10!") if (cycle_no % 60) == 0: self.PubSub.publish("Application.tick/60!") if (cycle_no % 300) == 0: self.PubSub.publish("Application.tick/300!") if (cycle_no % 600) == 0: self.PubSub.publish("Application.tick/600!") if (cycle_no % 1800) == 0: self.PubSub.publish("Application.tick/1800!") if (cycle_no % 3600) == 0: self.PubSub.publish("Application.tick/3600!") if (cycle_no % 43200) == 0: self.PubSub.publish("Application.tick/43200!") if (cycle_no % 86400) == 0: self.PubSub.publish("Application.tick/86400!") continue finally: future.set_result("run") async def _exit_time_governor(self, future): self.PubSub.publish("Application.exit!") # Finalize services futures = [] for service in self.Services.values(): nf = asyncio.ensure_future(service.finalize(self), loop=self.Loop) futures.append(nf) if len(futures) > 0: await asyncio.wait(futures, return_when=asyncio.ALL_COMPLETED) # TODO: Handle expections (if needed) - probably only print them # Finalize modules futures = [] for module in self.Modules: nf = asyncio.ensure_future(module.finalize(self), loop=self.Loop) futures.append(nf) if len(futures) > 0: await asyncio.wait(futures, return_when=asyncio.ALL_COMPLETED) # TODO: Handle expections (if needed) - probably only print them tasks_awaiting = 0 for i in range(3): ts = asyncio.Task.all_tasks(self.Loop) tasks_awaiting = 0 for t in ts: if t.done(): continue tasks_awaiting += 1 if tasks_awaiting <= 2: # 2 is for _exit_time_governor and wait() break await asyncio.sleep(1) else: L.warn("Exiting but {} async task(s) are still waiting".format(tasks_awaiting)) future.set_result("exit")
[docs] def set_exit_code(self, exit_code:int, force:bool=False): if (self.ExitCode < exit_code) or force: L.debug("Exit code set to {}",format(exit_code)) self.ExitCode = exit_code