Publish-Subscribe

Publish–subscribe is a messaging pattern where senders of messages, called publishers, send the messages to receivers, called subscribers, via PubSub message bus. Publishers don’t directly interact with subscribers in any way. Similarly, subscribers express interest in one or more message types and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

class asab.PubSub(app)[source]

ASAB PubSub operates with a simple messages, defined by their message type, which is a string. We recommend to add ! (explamation mark) at the end of the message type in order to distinguish this object from other types such as Python class names or functions. Example of the message type is e.g. Application.run! or Application.tick/600!.

The message can carry an optional positional and keyword arguments. The delivery of a message is implemented as a the standard Python function.

Note: There is an default, application-wide Publish-Subscribe message bus at Application.PubSub that can be used to send messages. Alternatively, you can create your own instance of PubSub and enjoy isolated PubSub delivery space.

Subscription

PubSub.subscribe(message_type, callback)[source]

Subscribe to a message type. Messages will be delivered to a callback callable (function or method). The callback can be a standard callable or an async coroutine. Asynchronous callback means that the delivery of the message will happen in a Future, asynchronously.

Callback callable will be called with the first argument

Example of a subscription to an Application.tick! messages.

class MyClass(object):
        def __init__(self, app):
                app.PubSub.subscribe("Application.tick!", self.on_tick)

        def on_tick(self, message_type):
                print(message_type)

Asynchronous version of the above:

class MyClass(object):
        def __init__(self, app):
                app.PubSub.subscribe("Application.tick!", self.on_tick)

        async def on_tick(self, message_type):
                await asyncio.sleep(5)
                print(message_type)
PubSub.subscribe_all(obj)[source]

To simplify the process of subscription to PubSub, ASAB offers the decorator-based “subscribe all” functionality.

In the followin example, both on_tick() and on_exit() methods are subscribed to Application.PubSub message bus.

class MyClass(object):
        def __init__(self, app):
                app.PubSub.subscribe_all(self)

        @asab.subscribe("Application.tick!")
        async def on_tick(self, message_type):
                print(message_type)

        @asab.subscribe("Application.exit!")
        def on_exit(self, message_type):
                print(message_type)
PubSub.unsubscribe(message_type, callback)[source]

Unsubscribe from a message delivery.

class asab.Subscriber(pubsub=None, *message_types)[source]

Subscriber object allows to consume PubSub messages in coroutines. It subscribes for various message types and consumes them. It works on FIFO basis (First message In, first message Out). If pubsub argument is None, the initial subscription is skipped.

subscriber = asab.Subscriber(
        app.PubSub,
        "Application.tick!",
        "Application.stop!"
)
message()[source]

Wait for a message asynchronously. Returns a three-members tuple (message_type, args, kwargs).

Example of the await message() use:

async def my_coroutine(app):
        # Subscribe for a two application events
        subscriber = asab.Subscriber(
                app.PubSub,
                "Application.tick!",
                "Application.exit!"
        )
        while True:
                message_type, args, kwargs = await subscriber.message()
                if message_type == "Application.exit!":
                        break
                print("Tick.")
subscribe(pubsub, message_type)[source]

Subscribe for more message types. This method can be called many times with various pubsub objects.

The subscriber object can be also used as an asynchonous generator. The example of the subscriber object usage in async for statement:

async def my_coroutine(self):
        # Subscribe for a two application events
        subscriber = asab.Subscriber(
                self.PubSub,
                "Application.tick!",
                "Application.exit!"
        )
        async for message_type, args, kwargs in subscriber:
                if message_type == "Application.exit!":
                        break;
                print("Tick.")

Publishing

PubSub.publish(message_type, *args, **kwargs)[source]

Publish a message to the PubSub message bus. It will be delivered to each subscriber synchronously. It means that the method returns after each subscribed callback is called.

The example of a message publish to the Application.PubSub message bus:

def my_function(app):
        app.PubSub.publish("mymessage!")

Asynchronous publishing of a message is requested by asynchronously=True argument. The publish() method returns immediatelly and the delivery of the message to subscribers happens, when control returns to the event loop.

The example of a asynchronous version of a message publish to the Application.PubSub message bus:

def my_function(app):
        app.PubSub.publish("mymessage!", asynchronously=True)

Synchronous vs. asynchronous messaging

ASAB PubSub supports both modes of a message delivery: synchronous and asynchronous. Moreover, PubSub also deals with modes, when asynchronous code (coroutine) does publish to synchronous code and vice versa.

  Sync publish Async publish
Sync subscribe Called immediately call_soon(...)
Async subscribe ensure_future(...) call_soon(...) & ensure_future(...)

Application-wide PubSub

Application.PubSub

The ASAB provides the application-wide Publish-Subscribe message bus.

Well-Known Messages

This is a list of well-known messages, that are published on a Application.PubSub by ASAB itself.

Application.init!

This message is published when application is in the init-time. It is actually one of the last things done in init-time, so the application environment is almost ready for use. It means that configuration is loaded, logging is setup, the event loop is constructed etc.

Application.run!

This message is emitted when application enters the run-time.

Application.stop!

This message is emitted when application wants to stop the run-time. It can be sent multiple times because of a process of graceful run-time termination. The first argument of the message is a counter that increases with every Application.stop! event.

Application.exit!

This message is emitted when application enter the exit-time.

Application.tick!
Application.tick/10!
Application.tick/60!
Application.tick/300!
Application.tick/600!
Application.tick/1800!
Application.tick/3600!
Application.tick/43200!
Application.tick/86400!

The application publish periodically “tick” messages. The default tick frequency is 1 second but you can change it by configuration [general] tick_period. Application.tick! is published every tick. Application.tick/10! is published every 10th tick and so on.

Application.hup!

This message is emitted when application receives UNIX signal SIGHUP or equivalent.

Application.housekeeping!

This message is published when application is on the time for housekeeping. The time for housekeeping is set to 03:00 AM UTC by default.

The app listens every ten minutes to see if it’s time for housekeeping. If the UTC time reaches the value for housekeeping, the app will publish it and set the time for the next housekeeping for the next day at the same time. There is also a time limit, which is set to 05:00 AM UTC by default. If the computer is in a sleep state, housekeeping will not be performed. Then, when the computer is reawakened again, it will check if it has exceeded the time limit. If not, then housekeeping will be published. If it has exceeded it, it simply informs the user and sets the housekeeping time for the next day. Note that this only limits the time when the housekeeping can start. If the housekeeping event triggers a procedure that takes a long time to finish, it will not be terminated when the time limit is reached.

Both housekeeping time and time limit can be changed in the configuration file:

[housekeeping]
at=19:30
limit=21:00

This sets the housekeeping time to 7:30 PM UTC and the time limit to 9:00 PM UTC. The time must be written in the format ‘HH:MM’. Remind yourself that the time is set to UTC, so you should be careful when operating in a different timezone.