import logging
import asyncio
import weakref
import functools
L = logging.getLogger(__name__)
[docs]class PubSub(object):
def __init__(self, app):
self.Subscribers = {}
self.Loop = app.Loop
[docs] def subscribe(self, message_type, callback):
"""
Subscribe a subscriber to the an message type.
It could be even plain function, method or its coroutine variant (then it will be delivered in a dedicated future)
"""
# If subscribe is a bound method, do special treatment
# https://stackoverflow.com/questions/53225/how-do-you-check-whether-a-python-method-is-bound-or-not
if hasattr(callback, '__self__'):
callback = weakref.WeakMethod(callback)
else:
callback = weakref.ref(callback)
if message_type not in self.Subscribers:
self.Subscribers[message_type] = [callback]
else:
self.Subscribers[message_type].append(callback)
[docs] def subscribe_all(self, obj):
"""
Find all @asab.subscribe decorated methods on the obj and do subscription
"""
for member_name in dir(obj):
member = getattr(obj, member_name)
message_types = getattr(member, 'asab_pubsub_subscribe_to_message_types', None)
if message_types is not None:
for message_type in message_types:
self.subscribe(message_type, member)
[docs] def unsubscribe(self, message_type, callback):
""" Remove a subscriber of an message type from the set. """
callback_list = self.Subscribers.get(message_type)
if callback_list is None:
L.warning("Message type subscription '{}'' not found.".format(message_type))
return
remove_list = None
for i in range(len(callback_list)):
# Take an weakref entry in the callback list and references it
c = callback_list[i]()
# Check if a weak reference is working
if c is None: # a reference is lost, remove this entry
if remove_list is None:
remove_list = list()
remove_list.append(callback_list[i])
continue
if c == callback:
callback_list.pop(i)
break
else:
L.warning("Subscriber '{}'' not found for the message type '{}'.".format(message_type, callback))
if remove_list is not None:
for callback_ref in remove_list:
callback_list.remove(callback_ref)
if len(callback_list) == 0:
del self.Subscribers[message_type]
def _callback_iter(self, message_type):
def _deliver_async(loop, callback, message_type, *args, **kwargs):
asyncio.ensure_future(callback(message_type, *args, **kwargs))
callback_list = self.Subscribers.get(message_type)
if callback_list is None:
return
remove_list = None
for callback_ref in callback_list:
callback = callback_ref()
# Check if a weak reference is working
if callback is None: # a reference is lost
if remove_list is None:
remove_list = list()
remove_list.append(callback_ref)
continue
if asyncio.iscoroutinefunction(callback):
callback = functools.partial(_deliver_async, self.Loop, callback)
yield callback
if remove_list is not None:
for callback_ref in remove_list:
callback_list.remove(callback_ref)
[docs] def publish(self, message_type, *args, **kwargs):
"""
Notify subscribers of an `message type`. Including arguments.
"""
asynchronously = kwargs.pop('asynchronously', False)
if asynchronously:
for callback in self._callback_iter(message_type):
self.Loop.call_soon(functools.partial(callback, message_type, *args, **kwargs))
else:
for callback in self._callback_iter(message_type):
try:
callback(message_type, *args, **kwargs)
except Exception:
L.exception("Error in a PubSub callback", struct_data={'message_type': message_type})
def publish_threadsafe(self, message_type, *args, **kwargs):
"""
Notify subscribers of an `message type` safely form a different that main thread.
"""
def in_main_thread():
self.publish(message_type, *args, **kwargs)
self.Loop.call_soon_threadsafe(in_main_thread)
async def message(self, message_type):
'''
This method allows to await a specific message from a coroutine.
It is a convenience method for `Subscriber` object.
Usage:
```
message_type, args, kwargs = await self.PubSub.message("Library.ready!")
```
`message_type`, `args` and `kwargs` are the same as in PubSub callback.
'''
subscriber = Subscriber(self, message_type)
message_type, args, kwargs = await subscriber.message()
return message_type, args, kwargs
class subscribe(object):
'''
Decorator
Usage:
@asab.subscribe("tick")
def on_tick(self, message_type):
print("Service tick")
'''
def __init__(self, message_type):
self.message_type = message_type
def __call__(self, f):
if getattr(f, 'asab_pubsub_subscribe_to_message_types', None) is None:
f.asab_pubsub_subscribe_to_message_types = [self.message_type]
else:
f.asab_pubsub_subscribe_to_message_types.append(self.message_type)
return f
[docs]class Subscriber(object):
'''
:any:`Subscriber` object allows to consume PubSub messages in coroutines.
It subscribes for various message types and consumes them.
It works on FIFO basis (First message In, first message Out).
If ``pubsub`` argument is None, the initial subscription is skipped.
.. code:: python
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.stop!"
)
'''
def __init__(self, pubsub=None, *message_types):
self._q = asyncio.Queue()
self._subscriptions = []
if pubsub is not None:
for message_type in message_types:
self.subscribe(pubsub, message_type)
[docs] def subscribe(self, pubsub, message_type):
'''
Subscribe for more message types. This method can be called many times with various ``pubsub`` objects.
'''
pubsub.subscribe(message_type, self)
self._subscriptions.append((pubsub, message_type))
def __call__(self, message_type, *args, **kwargs):
self._q.put_nowait((message_type, args, kwargs))
[docs] def message(self):
'''
Wait for a message asynchronously.
Returns a three-members tuple ``(message_type, args, kwargs)``.
Example of the `await message()` use:
.. code:: python
async def my_coroutine(app):
# Subscribe for a two application events
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.exit!"
)
while True:
message_type, args, kwargs = await subscriber.message()
if message_type == "Application.exit!":
break
print("Tick.")
'''
return self._q.get()
def __aiter__(self):
return self
async def __anext__(self):
return await self._q.get()