In an AnyBlok Model you have to decorate a method with bus_consumer
from anyblok_bus import bus_consumer
from anyblok import Declarations
from .schema import MySchema
@Declarations.register(Declarations.Model)
class MyModel:
@bus_consumer(queue_name='name of the queue', schema=MySchema())
def my_consumer(cls, body):
# do something
The schema must be an instance of marshmallow.Schema, see the marshmallow documentation
Note
The decorated method become a classmethod with always the same prototype (cls, body) body is the desarialization of the message from the queue by the schema.
The publication is done by registry.Bus.publish method:
registry.Bus.publish('exchange', 'routing_key', message, mimestype)
if the message have not be send, then an exception is raised
..warning:
A profile must be defined and selected by the AnyBlok configuration **bus_profile**