Code

Declare a consumer on queue with a marshmallow schema

decorator bus_consumer

anyblok_bus.consumer.bus_consumer(queue_name=None, schema=None)

anyblok model plugin

class anyblok_bus.consumer.BusConsumerPlugin(registry)

Bases: anyblok.model.plugins.ModelPluginBase

anyblok.model.plugin to allow the build of the anyblok_bus.bus_consumer

initialisation_tranformation_properties(properties, transformation_properties)

Initialise the transform properties

Parameters:
  • properties – the properties declared in the model
  • new_type_properties – param to add in a new base if need
insert_in_bases(new_base, namespace, properties, transformation_properties)

Insert in a base the overload

Parameters:
  • new_base – the base to be put on front of all bases
  • namespace – the namespace of the model
  • properties – the properties declared in the model
  • transformation_properties – the properties of the model
transform_base_attribute(attr, method, namespace, base, transformation_properties, new_type_properties)

transform the attribute for the final Model

Parameters:
  • attr – attribute name
  • method – method pointer of the attribute
  • namespace – the namespace of the model
  • base – One of the base of the model
  • transformation_properties – the properties of the model
  • new_type_properties – param to add in a new base if need

Worker

class anyblok_bus.worker.Worker(registry, profile, withautocommit=True)

Bases: object

Define consumers to consume the queue défined in the AnyBlok registry by the bus_consumer decorator

worker = Worker(anyblokregistry, profilename)
worker.start()  # blocking loop
worker.is_ready()  # return True if all the consumer are started
worker.stop()  # stop the loop and close the connection with rabbitmq
Parameters:
  • registry – anyblok registry instance
  • profile – the name of the profile which give the url of rabbitmq
close_channel_and_connection()
connect()

Creating connection object

declare_consumer(queue, model, method)
get_url()

Retrieve connection url

is_ready()
on_cancelok(unused_frame)
on_channel_closed(channel, reply_code, reply_text)

Called when channel is closed

on_channel_open(channel)

Called when channel is opened

on_connection_closed(connection, reply_code, reply_text)

Called when connection is closed by the server

on_connection_open(*a)

Called when we are fully connected to RabbitMQ

reconnect()
start()

Creating connection object and starting event loop

stop()
stop_consuming()

Set profile’s state to ‘disconnected’ and cancels every related consumers