import os
import sys
import time
import signal
import random
import logging
import asyncio
import argparse
import itertools
import platform
import datetime
import asab
try:
import daemon
import daemon.pidfile
import lockfile
except ImportError:
daemon = None
from .config import Config
from .abc.singleton import Singleton
from .log import Logging, _loop_exception_handler, LOG_NOTICE
from .task import TaskService
L = logging.getLogger(__name__)
[docs]class Application(metaclass=Singleton):
Description = "This app is based on ASAB."
[docs] def __init__(self, args=None, modules=[]):
'''
Argument `modules` allows to specify a list of ASAB modules that will be added by `app.add_module()` call.
Example:
class MyApplication(asab.Application):
def __init__(self):
super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
'''
try:
# EX_OK code is not available on Windows
self.ExitCode = os.EX_OK
except AttributeError:
self.ExitCode = 0
# Queue of Services to be initialized
self.InitServicesQueue = []
# Queue of Modules to be initialized
self.InitModulesQueue = []
# Parse command line
self.Args = self.parse_arguments(args=args)
# Load configuration
# Obtain HostName
self.HostName = platform.node()
os.environ['HOSTNAME'] = self.HostName
Config._load()
if hasattr(self.Args, "daemonize") and self.Args.daemonize:
self.daemonize()
elif hasattr(self.Args, "kill") and self.Args.kill:
self.daemon_kill()
# Seed the random generator
random.seed()
# Obtain the event loop
self.Loop = asyncio.get_event_loop()
if self.Loop.is_closed():
self.Loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.Loop)
self.LaunchTime = time.time()
self.BaseTime = self.LaunchTime - self.Loop.time()
self.Modules = []
self.Services = {}
# Setup logging
self.Logging = Logging(self)
# Configure the event loop
self.Loop.set_exception_handler(_loop_exception_handler)
if Config["logging"].getboolean("verbose"):
self.Loop.set_debug(True)
# Adding a handler to listen to the interrupt event
if platform.system() == "Windows":
try:
# Windows win32api import
import win32api
def handler(type):
self.stop()
return True
win32api.SetConsoleCtrlHandler(handler, True)
except ImportError as e:
L.warning("win32api module could not be loaded, because '{}'".format(
e
))
else:
# POSIX and other reasonable systems
self.Loop.add_signal_handler(signal.SIGINT, self.stop)
self.Loop.add_signal_handler(signal.SIGTERM, self.stop)
self.Loop.add_signal_handler(signal.SIGHUP, self._hup)
self._stop_event = asyncio.Event()
self._stop_event.clear()
self._stop_counter = 0
from .pubsub import PubSub
self.PubSub = PubSub(self)
L.info("Initializing ...")
self.TaskService = TaskService(self)
for module in modules:
self.add_module(module)
# Set housekeeping time and time limit
self.HousekeepingTime, self.HousekeepingTimeLimit, self.HousekeepingId = self._initialize_housekeeping_schedule()
self.HousekeepingMissedEvents: list = []
# Every 10 minutes listen for housekeeping
self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick)
[docs] def create_argument_parser(
self,
prog=None,
usage=None,
description=None,
epilog=None,
prefix_chars='-',
fromfile_prefix_chars=None,
argument_default=None,
conflict_handler='error',
add_help=True
):
'''
This method can be overriden to adjust argparse configuration.
Refer to the Python standard library to `argparse.ArgumentParser` for details of arguments.
'''
parser = argparse.ArgumentParser(
prog=prog,
usage=usage,
description=description if description is not None else self.Description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler,
add_help=add_help
)
parser.add_argument('-c', '--config', help='specify a path to a configuration file')
parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)')
parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog')
parser.add_argument('-l', '--log-file', help='specify a path to a log file')
parser.add_argument('-w', '--web-api', help='activate Asab web API (default listening port is 0.0.0.0:8080)', const="0.0.0.0:8080", nargs="?")
parser.add_argument('--startup-housekeeping', help='trigger housekeeping event immediately after application startup')
if daemon is not None:
parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)')
parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit')
return parser
def parse_arguments(self, args=None):
"""
It parses the command line arguments and sets the default values for the configuration accordingly.
:param args: The arguments to parse. If not set, sys.argv[1:] will be used
:return: The arguments that were parsed.
"""
parser = self.create_argument_parser()
args = parser.parse_args(args=args)
if args.config is not None:
Config._default_values['general']['config_file'] = args.config
if args.verbose:
Config._default_values['logging']['verbose'] = True
if args.syslog:
Config._default_values['logging:syslog']['enabled'] = True
if args.log_file:
Config._default_values['logging:file']['path'] = args.log_file
if args.web_api:
if 'web' not in Config._default_values:
Config._default_values['web'] = {}
Config._default_values['web']['listen'] = args.web_api
if args.startup_housekeeping:
Config._default_values['housekeeping']['run_at_startup'] = True
return args
def get_pidfile_path(self):
"""
Return the `pidfile` path from the configuration.
Example from the configuration:
```
[general]
pidfile=/tmp/my.pid
```
`pidfile` is a file that contains process id of the ASAB process.
It is used for interaction with OS respective it's control of running services.
If the `pidfile` is set to "", then return None.
If it's set to "!", then return the default pidfile path (in `/var/run/` folder).
This is the default value.
:return: The path to the `pidfile`.
"""
pidfilepath = Config['general']['pidfile']
if pidfilepath == "":
return None
elif pidfilepath == "!":
return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid')
else:
return pidfilepath
def daemonize(self):
if daemon is None:
print("Install 'python-daemon' module to support daemonizing.", file=sys.stderr)
sys.exit(1)
pidfilepath = self.get_pidfile_path()
if pidfilepath is not None:
pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfilepath)
working_dir = Config['general']['working_dir']
uid = Config['general']['uid']
if uid == "":
uid = None
gid = Config['general']['gid']
if gid == "":
gid = None
signal_map = {
signal.SIGTTIN: None,
signal.SIGTTOU: None,
signal.SIGTSTP: None,
}
self.DaemonContext = daemon.DaemonContext(
working_directory=working_dir,
signal_map=signal_map,
pidfile=pidfile,
uid=uid,
gid=gid,
)
try:
self.DaemonContext.open()
except lockfile.AlreadyLocked as e:
print("Cannot create a PID file '{}':".format(pidfilepath), e, file=sys.stderr)
sys.exit(1)
def daemon_kill(self):
if daemon is None:
print("Install 'python-daemon' module to support daemonising.", file=sys.stderr)
sys.exit(1)
pidfilepath = self.get_pidfile_path()
if pidfilepath is None:
sys.exit(0)
try:
pid = open(pidfilepath, "r").read()
except FileNotFoundError:
print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
sys.exit(0)
pid = int(pid)
for sno in [signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGTERM]:
try:
os.kill(pid, sno)
except ProcessLookupError:
print("Process with pid '{}' not found.".format(pid), file=sys.stderr)
sys.exit(0)
for i in range(10):
if not os.path.exists(pidfilepath):
sys.exit(0)
time.sleep(0.1)
print("Daemon process (pid: {}) still running ...".format(pid), file=sys.stderr)
print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
sys.exit(1)
[docs] def run(self):
# Comence init-time
self.PubSub.publish("Application.init!")
self.Loop.run_until_complete(asyncio.gather(
self._init_time_governor(),
self.initialize(),
))
try:
# Comence run-time and application main() function
L.log(LOG_NOTICE, "is ready.")
self._stop_event.clear()
self.Loop.run_until_complete(asyncio.gather(
self._run_time_governor(),
self.main(),
))
# Comence exit-time
if self.ExitCode == "!RESTART!":
L.log(LOG_NOTICE, "is restarting ...")
else:
L.log(LOG_NOTICE, "is exiting ...", struct_data={'exit_code': self.ExitCode})
self.Loop.run_until_complete(asyncio.gather(
self.finalize(),
self._exit_time_governor(),
))
# Python 3.5 lacks support for shutdown_asyncgens()
if hasattr(self.Loop, "shutdown_asyncgens"):
self.Loop.run_until_complete(self.Loop.shutdown_asyncgens())
self.Loop.close()
finally:
if self.ExitCode == "!RESTART!":
os.execv(sys.executable, [os.path.basename(sys.executable)] + sys.argv)
return self.ExitCode
[docs] def stop(self, exit_code: int = None):
if exit_code is not None:
self.set_exit_code(exit_code)
self._stop_event.set()
self._stop_counter += 1
self.PubSub.publish("Application.stop!", self._stop_counter)
if self._stop_counter >= 3:
L.fatal("Emergency exit")
for task in asyncio.all_tasks():
L.warning("Pending task during emergency exit: {}".format(task))
try:
# EX_SOFTWARE code is not available on Windows
return os._exit(os.EX_SOFTWARE)
except AttributeError:
return os._exit(0)
elif self._stop_counter > 1:
L.warning("{} tasks still active".format(len(asyncio.all_tasks())))
def _do_restart(self, event_name):
self.stop("!RESTART!")
def restart(self):
'''
Schedules a hard restart of the whole application.
This function works by using os.execv(), which replaces the current process with a new one (without creating a new process ID).
Arguments and environment variables will be retained.
IMPORTANT: Please note that this will work on Unix-based systems only, as it uses a feature specific to Unix.
A piece of advice: Be careful while using this function, make sure you have some control over when and how this function is being called to avoid any unexpected process restarts.
It is not common to use these types of function calls in Python applications.
'''
self.PubSub.subscribe("Application.tick/10!", self._do_restart)
def _hup(self):
self.Logging.rotate()
self.PubSub.publish("Application.hup!")
# Modules
[docs] def add_module(self, module_class):
"""
Load a new module.
"""
for module in self.Modules:
if isinstance(module, module_class):
# Already loaded and registered
return
module = module_class(self)
self.Modules.append(module)
# Enqueue module for initialization (happens in run phase)
self.InitModulesQueue.append(module)
# Services
[docs] def get_service(self, service_name):
"""
Get a new service by its name.
Returns `None` if the service is not registered.
"""
return self.Services.get(service_name)
def _register_service(self, service):
"""
Register a new service using its name.
"""
if service.Name in self.Services:
L.error("Service '{}' already registered (existing:{} new:{})".format(
service.Name, self.Services[service.Name], service))
raise RuntimeError("Service {} already registered".format(service.Name))
self.Services[service.Name] = service
# Enqueue service for initialization (happens in run phase)
self.InitServicesQueue.append(service)
# Lifecycle callback
[docs] async def initialize(self):
pass
[docs] async def main(self):
pass
[docs] async def finalize(self):
pass
# Governors
async def _init_time_governor(self):
# Initialize all services that has been created during application construction
await self._ensure_initialization()
async def _run_time_governor(self):
timeout = Config.getint('general', 'tick_period')
self.PubSub.publish("Application.run!")
if Config.getboolean("housekeeping", "run_at_startup", fallback=False):
L.log(asab.LOG_NOTICE, "Startup housekeeping...")
self.PubSub.publish("Application.housekeeping!")
# Wait for stop event & tick in meanwhile
for cycle_no in itertools.count(1):
await self._ensure_initialization()
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=timeout)
break
except asyncio.TimeoutError:
self.PubSub.publish("Application.tick!")
if (cycle_no % 10) == 0:
self.PubSub.publish("Application.tick/10!")
if (cycle_no % 60) == 0:
# Rebase a Loop time
self.BaseTime = time.time() - self.Loop.time()
self.PubSub.publish("Application.tick/60!")
if (cycle_no % 300) == 0:
self.PubSub.publish("Application.tick/300!")
if (cycle_no % 600) == 0:
self.PubSub.publish("Application.tick/600!")
if (cycle_no % 1800) == 0:
self.PubSub.publish("Application.tick/1800!")
if (cycle_no % 3600) == 0:
self.PubSub.publish("Application.tick/3600!")
if (cycle_no % 43200) == 0:
self.PubSub.publish("Application.tick/43200!")
if (cycle_no % 86400) == 0:
self.PubSub.publish("Application.tick/86400!")
continue
async def _exit_time_governor(self):
self.PubSub.publish("Application.exit!")
# Finalize services
futures = set()
for service in self.Services.values():
futures.add(
asyncio.ensure_future(service.finalize(self))
)
while len(futures) > 0:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
fut.result()
except Exception:
L.exception("Error during finalize call")
# Finalize modules
futures = set()
for module in self.Modules:
futures.add(
asyncio.ensure_future(module.finalize(self))
)
while len(futures) > 0:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
fut.result()
except Exception:
L.exception("Error during finalize call")
# Wait for non-finalized tasks
tasks_awaiting = 0
for i in range(3):
try:
ts = asyncio.all_tasks(self.Loop)
except AttributeError:
# Compatibility for Python 3.6-
ts = asyncio.Task.all_tasks(self.Loop)
tasks_awaiting = 0
for t in ts:
if t.done():
continue
tasks_awaiting += 1
if tasks_awaiting <= 1:
# 2 is for _exit_time_governor and wait()
break
await asyncio.sleep(1)
else:
L.warning("Exiting but {} async task(s) are still waiting".format(tasks_awaiting))
async def _ensure_initialization(self):
'''
This method ensures that any newly add module or registered service is initialized.
It is called from:
(1) init-time for modules&services added during application construction.
(2) run-time for modules&services added during aplication lifecycle.
'''
# Initialize modules
while len(self.InitModulesQueue) > 0:
module = self.InitModulesQueue.pop()
try:
await module.initialize(self)
except Exception:
L.exception("Error during module initialization")
# Initialize services
while len(self.InitServicesQueue) > 0:
service = self.InitServicesQueue.pop()
try:
await service.initialize(self)
except Exception:
L.exception("Error during service initialization")
[docs] def set_exit_code(self, exit_code: int, force: bool = False):
if self.ExitCode == "!RESTART!":
return
if exit_code == "!RESTART!":
self.ExitCode = exit_code
elif (self.ExitCode < exit_code) or force:
L.debug("Exit code set to {}".format(exit_code))
self.ExitCode = exit_code
# Time
[docs] def time(self):
'''
Return UTC unix timestamp using a loop time (a fast way how to get a wall clock time).
'''
return self.BaseTime + self.Loop.time()
# Housekeeping
def _initialize_housekeeping_schedule(self):
"""
Set the next housekeeping time and time limit from configuration.
Returns: (next_housekeeping_time, next_time_limit, next_housekeeping_id)
"""
config_house_time = datetime.datetime.strptime(Config['housekeeping']['at'], "%H:%M") # default: 03:00
config_time_limit = datetime.datetime.strptime(Config['housekeeping']['limit'], "%H:%M") # default: 05:00
now = datetime.datetime.now(datetime.timezone.utc)
next_housekeeping_time = now.replace(
hour=config_house_time.hour,
minute=config_house_time.minute,
second=0,
microsecond=0)
# if the app started after the housekeeping time, set it to the next day
if now > next_housekeeping_time:
next_housekeeping_time += datetime.timedelta(days=1)
# compute the time limit for the housekeeping
time_delta_limit = config_time_limit - config_house_time
if time_delta_limit < datetime.timedelta(hours=0):
time_delta_limit += datetime.timedelta(days=1)
next_time_limit = next_housekeeping_time + time_delta_limit
# Each time has its id that prevents from accidental executing housekeeping twice.
next_housekeeping_id = housekeeping_id(now)
return (next_housekeeping_time, next_time_limit, next_housekeeping_id)
def _on_housekeeping_tick(self, message_type):
"""
Check if it's time for publishing the 'Application.housekeeping!' message.
If so, publish the message and set housekeeping time, the time limit and time id for the next day.
"""
now = datetime.datetime.now(datetime.timezone.utc)
today_id = housekeeping_id(now)
if self.HousekeepingTime < now:
if now < self.HousekeepingTimeLimit and self.HousekeepingId <= today_id:
self.PubSub.publish("Application.housekeeping!")
else:
L.warning(
"Housekeeping has not been executed: It is past the time limit.",
struct_data={
"housekeeping_time": self.HousekeepingTime.strftime("%Y-%m-%d %H:%M:%S"),
"time_limit": self.HousekeepingTimeLimit.strftime("%Y-%m-%d %H:%M:%S"),
"housekeeping_id": self.HousekeepingId,
}
)
self.HousekeepingMissedEvents.append(today_id)
self.HousekeepingTime += datetime.timedelta(days=1)
self.HousekeepingTimeLimit += datetime.timedelta(days=1)
self.HousekeepingId = housekeeping_id(self.HousekeepingTime)
L.log(
LOG_NOTICE,
"Setting time for the next housekeeping.",
struct_data={
"next_housekeeping_time": self.HousekeepingTime.strftime("%Y-%m-%d %H:%M:%S"),
"next_time_limit": self.HousekeepingTimeLimit.strftime("%Y-%m-%d %H:%M:%S"),
"next_housekeeping_id": self.HousekeepingId,
}
)
if len(self.HousekeepingMissedEvents) > 0:
L.warning(
"One or more Housekeeping events have not been executed.",
struct_data={
"missed_housekeeping_events": self.HousekeepingMissedEvents
})
def housekeeping_id(dt: datetime.datetime) -> int:
"""
Create a unique ID for each date. Utility function for housekeeping.
>>> housekeeping_id(datetime.datetime.now())
20230418
"""
return int(dt.strftime("%Y%m%d"))