The blocking connection adapter module implements blocking semantics on top of Pika’s core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.
The user facing classes in the module consist of the BlockingConnection and the BlockingChannel classes.
Be sure to check out examples in Usage Examples.
The BlockingConnection creates a layer on top of Pika’s asynchronous core providing methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you can still implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish .
For more information about communicating with the blocking_connection adapter, be sure to check out the BlockingChannel class which implements the Channel based communication for the blocking_connection adapter.
Add a callback to be notified when RabbitMQ has sent a Connection.Blocked frame indicating that RabbitMQ is low on resources. Publishers can use this to voluntarily suspend publishing, instead of relying on back pressure throttling. The callback will be passed the Connection.Blocked method frame.
Parameters: | callback_method (method) – Callback to call on Connection.Blocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Blocked |
---|
Add a callback to be notified when RabbitMQ has sent a Connection.Unblocked frame letting publishers know it’s ok to start publishing again. The callback will be passed the Connection.Unblocked method frame.
Parameters: | callback_method (method) – Callback to call on Connection.Unblocked, having the signature callback_method(pika.frame.Method), where the method frame’s method member is of type pika.spec.Connection.Unblocked |
---|
Create a single-shot timer to fire after deadline seconds. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.
NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.
Parameters: |
|
---|---|
Returns: | opaque timer id |
Specifies if the server supports basic.nack on the active connection.
Return type: | bool |
---|
Specifies if the server supports basic.nack on the active connection.
Return type: | bool |
---|
Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.
Return type: | pika.synchronous_connection.BlockingChannel |
---|
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
Parameters: |
|
---|
Specifies if the server supports consumer cancel notification on the active connection.
Return type: | bool |
---|
Specifies if the server supports consumer cancel notification on the active connection.
Return type: | bool |
---|
Specifies if the active connection supports exchange to exchange bindings.
Return type: | bool |
---|
Specifies if the active connection supports exchange to exchange bindings.
Return type: | bool |
---|
Returns a boolean reporting the current connection state.
Returns a boolean reporting the current connection state.
Returns a boolean reporting the current connection state.
Will make sure that data events are processed. Dispatches timer and channel callbacks if not called from the scope of BlockingConnection or BlockingChannel callback. Your app can block on this method.
Parameters: | time_limit (float) – suggested upper bound on processing time in seconds. The actual blocking time depends on the granularity of the underlying ioloop. Zero means return as soon as possible. None means there is no limit on processing time and the function will block until I/O produces actionalable events. Defaults to 0 for backward compatibility. This parameter is NEW in pika 0.10.0. |
---|
Specifies if the active connection can use publisher confirmations.
Return type: | bool |
---|
Specifies if the active connection can use publisher confirmations.
Return type: | bool |
---|
Remove a timer if it’s still in the timeout stack
Parameters: | timeout_id – The opaque timer id to remove |
---|
A safer way to sleep than calling time.sleep() directly that would keep the adapter from ignoring frames sent from the broker. The connection will “sleep” or block the number of seconds specified in duration in small intervals.
Parameters: | duration (float) – The time to sleep in seconds |
---|
The BlockingChannel implements blocking semantics for most things that one would use callback-passing-style for with the Channel class. In addition, the BlockingChannel class implements a generator that allows you to consume messages without using callbacks.
Example of creating a BlockingChannel:
import pika
# Create our connection object
connection = pika.BlockingConnection()
# The returned object will be a synchronous channel
channel = connection.channel()
Pass a callback function that will be called when Basic.Cancel is sent by the broker. The callback function should receive a method frame parameter.
Parameters: | callback (callable) – a callable for handling broker’s Basic.Cancel notification with the call signature: callback(method_frame) where method_frame is of type pika.frame.Method with method of type spec.Basic.Cancel |
---|
Pass a callback function that will be called when a published message is rejected and returned by the server via Basic.Return.
Parameters: | callback (callable) – The method to call on callback with the signature callback(channel, method, properties, body), where channel: pika.Channel method: pika.spec.Basic.Return properties: pika.spec.BasicProperties body: str, unicode, or bytes (python 3.x) |
---|
Acknowledge one or more messages. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.
Parameters: |
|
---|
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.
NOTE: When cancelling a no_ack=False consumer, this implementation automatically Nacks and suppresses any incoming messages that have not yet been dispatched to the consumer’s callback. However, when cancelling a no_ack=True consumer, this method will return any pending messages that arrived before broker confirmed the cancellation.
Parameters: | consumer_tag (str) – Identifier for the consumer; the result of passing a consumer_tag that was created on another channel is undefined (bad things will happen) |
---|---|
Returns: | (NEW IN pika 0.10.0) empty sequence for a no_ack=False
consumer; for a no_ack=True consumer, returns a (possibly empty)
sequence of pending messages that arrived before broker confirmed
the cancellation (this is done instead of via consumer’s callback in
order to prevent reentrancy/recursion. Each message is four-tuple:
(channel, method, properties, body)
|
Sends the AMQP command Basic.Consume to the broker and binds messages for the consumer_tag to the consumer callback. If you do not pass in a consumer_tag, one will be automatically generated for you. Returns the consumer tag.
NOTE: the consumer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.
For more information about Basic.Consume, see: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
Parameters: |
|
---|---|
Returns: | consumer tag |
Return type: | str |
Raises pika.exceptions.DuplicateConsumerTag: | |
if consumer with given consumer_tag is already present. |
Get a single message from the AMQP broker. Returns a sequence with the method frame, message properties, and body.
Parameters: |
|
---|---|
Returns: | a three-tuple; (None, None, None) if the queue was empty; otherwise (method, properties, body); NOTE: body may be None |
Return type: | (None, None, None)|(spec.Basic.GetOk, spec.BasicProperties, str or unicode or None) |
This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
Parameters: |
|
---|
Publish to the channel with the given exchange, routing key and body. Returns a boolean value indicating the success of the operation.
This is the legacy BlockingChannel method for publishing. See also BasicChannel.publish that provides more information about failures.
For more information on basic_publish and what the parameters do, see:
Parameters: |
|
---|---|
Returns: | True if delivery confirmation is not enabled (NEW in pika 0.10.0); otherwise returns False if the message could not be deliveved (Basic.nack and/or Basic.Return) and True if the message was delivered (Basic.ack and no Basic.Return) |
Specify quality of service. This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement.
Parameters: |
|
---|
This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.
Parameters: | requeue (bool) – If False, the message will be redelivered to the original recipient. If True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber. |
---|
Reject an incoming message. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
Parameters: |
|
---|
Cancel the queue consumer created by BlockingChannel.consume, rejecting all pending ackable messages.
NOTE: If you’re looking to cancel a consumer issued with BlockingChannel.basic_consume then you should call BlockingChannel.basic_cancel.
Return int: | The number of messages requeued by Basic.Nack. NEW in 0.10.0: returns 0 |
---|
Channel number
Will invoke a clean shutdown of the channel with the AMQP Broker.
Parameters: |
|
---|
Turn on RabbitMQ-proprietary Confirm mode in the channel.
The channel’s BlockingConnection instance
Blocking consumption of a queue instead of via a callback. This method is a generator that yields each message as a tuple of method, properties, and body. The active generator iterator terminates when the consumer is cancelled by client or broker.
Example:
- for method, properties, body in channel.consume(‘queue’):
- print body channel.basic_ack(method.delivery_tag)
You should call BlockingChannel.cancel() when you escape out of the generator loop.
If you don’t cancel this consumer, then next call on the same channel to consume() with the exact same (queue, no_ack, exclusive) parameters will resume the existing consumer generator; however, calling with different parameters will result in an exception.
Parameters: |
|
---|---|
Yields : | tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode) |
Raises ValueError: | |
if consumer-creation parameters don’t match those of the existing queue consumer generator, if any. NEW in pika 0.10.0 |
Bind an exchange to another exchange.
Parameters: |
|
---|---|
Returns: | Method frame from the Exchange.Bind-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Exchange.BindOk |
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).
Parameters: |
|
---|---|
Returns: | Method frame from the Exchange.Declare-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Exchange.DeclareOk |
Delete the exchange.
Parameters: |
|
---|---|
Returns: | Method frame from the Exchange.Delete-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Exchange.DeleteOk |
Unbind an exchange from another exchange.
Parameters: |
|
---|---|
Returns: | Method frame from the Exchange.Unbind-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Exchange.UnbindOk |
Turn Channel flow control off and on.
NOTE: RabbitMQ doesn’t support active=False; per https://www.rabbitmq.com/specification.html: “active=false is not supported by the server. Limiting prefetch with basic.qos provides much better control”
For more information, please reference:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
Parameters: | active (bool) – Turn flow on (True) or off (False) |
---|---|
Returns: | True if broker will start or continue sending; False if not |
Return type: | bool |
Returns the number of messages that may be retrieved from the current queue consumer generator via BasicChannel.consume without blocking. NEW in pika 0.10.0
Return type: | int |
---|
Returns True if the channel is closed.
Return type: | bool |
---|
Returns True if the channel is closing.
Return type: | bool |
---|
Returns True if the channel is open.
Return type: | bool |
---|
Publish to the channel with the given exchange, routing key, and body. Unlike the legacy BlockingChannel.basic_publish, this method provides more information about failures via exceptions.
For more information on basic_publish and what the parameters do, see:
Parameters: |
|
---|---|
Raises: |
|
Bind the queue to the specified exchange
Parameters: |
|
---|---|
Returns: | Method frame from the Queue.Bind-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Queue.BindOk |
Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
Leave the queue name empty for a auto-named queue in RabbitMQ
Parameters: |
|
---|---|
Returns: | Method frame from the Queue.Declare-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Queue.DeclareOk |
Delete a queue from the broker.
Parameters: |
|
---|---|
Returns: | Method frame from the Queue.Delete-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Queue.DeleteOk |
Purge all of the messages from the specified queue
Parameters: | queue (str or unicode) – The queue to purge |
---|---|
Returns: | Method frame from the Queue.Purge-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Queue.PurgeOk |
Unbind a queue from an exchange.
Parameters: |
|
---|---|
Returns: | Method frame from the Queue.Unbind-ok response |
Return type: | pika.frame.Method having method attribute of type spec.Queue.UnbindOk |
Processes I/O events and dispatches timers and basic_consume callbacks until all consumers are cancelled.
NOTE: this blocking function may not be called from the scope of a pika callback, because dispatching basic_consume callbacks from this context would constitute recursion.
Raises pika.exceptions.RecursionError: | |
---|---|
if called from the scope of a BlockingConnection or BlockingChannel callback |
Cancels all consumers, signalling the start_consuming loop to exit.
NOTE: pending non-ackable messages will be lost; pending ackable messages will be rejected.
Commit a transaction.
Returns: | Method frame from the Tx.Commit-ok response |
---|---|
Return type: | pika.frame.Method having method attribute of type spec.Tx.CommitOk |
Rollback a transaction.
Returns: | Method frame from the Tx.Commit-ok response |
---|---|
Return type: | pika.frame.Method having method attribute of type spec.Tx.CommitOk |
Select standard transaction mode. This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
Returns: | Method frame from the Tx.Select-ok response |
---|---|
Return type: | pika.frame.Method having method attribute of type spec.Tx.SelectOk |