Source code for asab.mom.broker
import abc
import logging
import asyncio
import asab
#
L = logging.getLogger(__name__)
#
[docs]class Broker(abc.ABC, asab.ConfigObject):
def __init__(self, app, accept_replies:bool, task_service, config_section_name:str, config=None):
if task_service == None:
task_service = app.get_service("asab.MOMService")
super().__init__(config_section_name=config_section_name, config=config)
self.TaskService = task_service
self.TaskService._register_broker(self)
self.Loop = app.Loop
self.Subscriptions = dict()
self.Targets = {}
self.AcceptReplies = accept_replies
self.MainFuture = asyncio.ensure_future(self.main(), loop=self.Loop)
async def finalize(self, app):
self.MainFuture.cancel()
[docs] def subscribe(self, subscription:str, **kwags):
self.Subscriptions[subscription] = kwags
asyncio.ensure_future(self.ensure_subscriptions(), loop=self.Loop)
[docs] def add(self, target:str, handler):
t = self.Targets.get(target)
if t is None:
self.Targets[target] = [handler]
else:
t.append(handler)
async def dispatch(self, target, properties, body):
tlist = self.Targets.get(target)
if tlist is None:
L.warn("Received a message for an unknown target '{}'".format(target))
return
for handler in tlist:
reply = await handler(properties, body)
if properties.reply_to is not None:
#TODO: If reply_to is URL, then use HTTP to deliver reply
await self.reply(reply,
reply_to=properties.reply_to,
correlation_id=properties.correlation_id,
)
elif reply is not None:
L.warn("Discart the reply from target '{}'".format(target))
async def main(self):
pass
async def ensure_subscriptions(self):
pass
[docs] async def publish(self, body, target:str='',
content_type:str=None,
content_encoding:str=None,
correlation_id:str=None,
reply_to:str=None,
):
pass
async def reply(self, body,
reply_to:str,
content_type:str=None,
content_encoding:str=None,
correlation_id:str=None,
):
pass