Source code for asab.mom.broker
import abc
import logging
import asyncio
import asab
#
L = logging.getLogger(__name__)
#
[docs]class Broker(abc.ABC, asab.ConfigObject):
"""
Broker is implementation of object request broker (ORB) within the Message-oriented middleware concept.
Broker allows to register callbacks for "task" and "reply" procedures, via "add" method.
Tasks and replies are then distributed to the registered callbacks.
self.Broker.add("task", self.task_handler)
self.Broker.add("reply", self.reply_handler)
In order to connect broker with the middleware (such as RabbitMQ, see AMQPBroker in the "amqp" submodule),
it is needed that the broker is subscribed to task and reply queues in the middleware, via "subscribe" method.
self.Broker.subscribe("task.queue")
The method "publish" the serve to publish a task to the middleware.
await self.Broker.publish("Hello world!", target="task")
The brokers from different applications can be connected to the same middleware,
where one application may publish tasks, while others process them and publish replies.
"""
def __init__(self, app, accept_replies: bool, task_service, config_section_name: str, config=None):
if task_service is None:
task_service = app.get_service("asab.MOMService")
super().__init__(config_section_name=config_section_name, config=config)
self.TaskService = task_service
self.TaskService._register_broker(self)
self.Loop = app.Loop
self.Subscriptions = dict()
self.Targets = {}
self.AcceptReplies = accept_replies
self.MainFuture = asyncio.ensure_future(self.main(), loop=self.Loop)
async def finalize(self, app):
self.MainFuture.cancel()
[docs] def subscribe(self, subscription: str, **kwags):
self.Subscriptions[subscription] = kwags
asyncio.ensure_future(self.ensure_subscriptions(), loop=self.Loop)
[docs] def add(self, target: str, handler):
t = self.Targets.get(target)
if t is None:
self.Targets[target] = [handler]
else:
t.append(handler)
async def dispatch(self, target, properties, body):
tlist = self.Targets.get(target)
if tlist is None:
L.warning("Received a message for an unknown target '{}'".format(target))
return
for handler in tlist:
reply = await handler(properties, body)
if properties.reply_to is not None:
# TODO: If reply_to is URL, then use HTTP to deliver reply
await self.reply(
reply,
reply_to=properties.reply_to,
correlation_id=properties.correlation_id,
)
elif reply is not None:
L.warning("Discart the reply from target '{}'".format(target))
async def main(self):
pass
async def ensure_subscriptions(self):
pass
[docs] async def publish(
self,
body,
target: str = '',
content_type: str = None,
content_encoding: str = None,
correlation_id: str = None,
reply_to: str = None,
):
pass
async def reply(
self,
body,
reply_to: str,
content_type: str = None,
content_encoding: str = None,
correlation_id: str = None,
):
pass