Publish-Subscribe

Publish–subscribe is a messaging pattern where senders of messages, called publishers, send the messages to receivers, called subscribers, via PubSub message bus. Publishers don’t directly interact with subscribers in any way. Similarly, subscribers express interest in one or more message types and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

class asab.PubSub(app)[source]

ASAB PubSub operates with a simple messages, defined by their message type, which is a string. We recommend to add ! (explamation mark) at the end of the message type in order to distinguish this object from other types such as Python class names or functions. Example of the message type is e.g. Application.run! or Application.tick/600!.

The message can carry an optional positional and keyword arguments. The delivery of a message is implemented as a the standard Python function.

Note: There is an default, application-wide Publish-Subscribe message bus at Application.PubSub that can be used to send messages. Alternatively, you can create your own instance of PubSub and enjoy isolated PubSub delivery space.

Subscription

PubSub.subscribe(message_type, callback)[source]

Subscribe to a message type. Messages will be delivered to a callback callable (function or method). The callback can be a standard callable or an async coroutine. Asynchronous callback means that the delivery of the message will happen in a coroutine, asynchronously.

Callback callable will be called with the first argument

Example of a subscription to an Application.tick! messages.

class MyClass(object):
    def __init__(self, app):
        app.PubSub.subscribe("Application.tick!", self.on_tick)

    def on_tick(self, message_type):
        print(message_type)

Asynchronous version of the above:

class MyClass(object):
    def __init__(self, app):
        app.PubSub.subscribe("Application.tick!", self.on_tick)

    async def on_tick(self, message_type):
        await asyncio.sleep(5)
        print(message_type)
PubSub.subscribe_all(obj)[source]

To simplify the process of subscription to PubSub, ASAB offers the decorator-based “subscribe all” functionality.

In the followin example, both on_tick() and on_exit() methods are subscribed to Application.PubSub message bus.

class MyClass(object):
    def __init__(self, app):
        app.PubSub.subscribe_all(self)

    @asab.subscribe("Application.tick!")
    async def on_tick(self, message_type):
        print(message_type)

    @asab.subscribe("Application.exit!")
    def on_exit(self, message_type):
        print(message_type)
PubSub.unsubscribe(message_type, callback)[source]

Unsubscribe from a message delivery.

class asab.Subscriber(pubsub=None, *message_types)[source]

Subscriber object allows to consume PubSub messages in coroutines. It subscribes for various message types and consumes them. It works on FIFO basis (First message In, first message Out). If pubsub argument is None, the initial subscription is skipped.

subscriber = asab.Subscriber(
    app.PubSub,
    "Application.tick!",
    "Application.stop!"
)
message()[source]

Wait for a message asynchronously. Returns a three-members tuple (message_type, args, kwargs).

# Use in await statement
message = await subscriber.message()
subscribe(pubsub, message_type)[source]

Subscribe for more message types. This method can be called many times with various pubsub objects.

Publishing

PubSub.publish(message_type, *args, **kwargs)[source]

Publish a message to the PubSub message bus. It will be delivered to each subscriber synchronously. It means that the method returns after each subscribed callback is called.

The example of a message publish to the Application.PubSub message bus:

def my_function(app):
    app.PubSub.publish("mymessage!")

Asynchronous message delivery can be trigged by providing asynchronously=True keyword argument. Each subscriber is then handled in a dedicated Future object. The method returns immediatelly and the delivery of the message to subscribers happens, when control returns to the event loop.

The example of a asynchronous version of a message publish to the Application.PubSub message bus:

def my_function(app):
    app.PubSub.publish("mymessage!", asynchronously=True)

Application-wide PubSub

Application.PubSub

The ASAB provides the application-wide Publish-Subscribe message bus.

Well-Known Messages

Application.init!

This message is published when application is in the init-time. It is actually one of the last things done in init-time, so the application environment is almost ready for use. It means that configuration is loaded, logging is setup, the event loop is constructed etc.

Application.run!

This message is emitted when application enters the run-time.

Application.stop!

This message is emitted when application wants to stop the run-time. It can be sent multiple times because of a process of graceful run-time termination. The first argument of the message is a counter that increases with every Application.stop! event.

Application.exit!

This message is emitted when application enter the exit-time.

Application.tick!
Application.tick/10!
Application.tick/60!
Application.tick/300!
Application.tick/600!
Application.tick/1800!
Application.tick/3600!
Application.tick/43200!
Application.tick/86400!

The application publish periodically “tick” messages. The default tick frequency is 1 second but you can change it by configuration [general] tick_period. Application.tick! is published every tick. Application.tick/10! is published every 10th tick and so on.

Application.hup!

This message is emitted when application receives UNIX signal SIGHUP or equivalent.