summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqp.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/amqp.py')
-rw-r--r--oslo_messaging/_drivers/amqp.py32
1 files changed, 28 insertions, 4 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index ebae514..0d3dc51 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -55,6 +55,26 @@ amqp_opts = [
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
+# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
+# this connection can be used for two purposes:
+# * wait and receive amqp messages (only do read stuffs on the socket)
+# * send messages to the broker (only do write stuffs on the socket)
+# The code inside a connection class is not concurrency safe.
+# Using one Connection class instance for doing both, will result
+# of eventlet complaining of multiple greenthreads that read/write the
+# same fd concurrently... because 'send' and 'listen' run in different
+# greenthread.
+# So, a connection cannot be shared between thread/greenthread and
+# this two variables permit to define the purpose of the connection
+# to allow drivers to add special handling if needed (like heatbeat).
+# amqp drivers create 3 kind of connections:
+# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
+# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
+# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
+# to wait replies of rpc call
+PURPOSE_LISTEN = 'listen'
+PURPOSE_SEND = 'send'
+
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
@@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool):
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
- def create(self):
+ def create(self, purpose=None):
+ if purpose is None:
+ purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url)
+ return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
@@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, pooled=True):
+ def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
+ pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
- self.connection = connection_pool.create()
+ self.connection = connection_pool.create(purpose)
self.pooled = pooled
+ self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""