diff options
Diffstat (limited to 'oslo_messaging/_drivers/amqp.py')
-rw-r--r-- | oslo_messaging/_drivers/amqp.py | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index ebae514..0d3dc51 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -55,6 +55,26 @@ amqp_opts = [ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) +# NOTE(sileht): Even if rabbit/qpid have only one Connection class, +# this connection can be used for two purposes: +# * wait and receive amqp messages (only do read stuffs on the socket) +# * send messages to the broker (only do write stuffs on the socket) +# The code inside a connection class is not concurrency safe. +# Using one Connection class instance for doing both, will result +# of eventlet complaining of multiple greenthreads that read/write the +# same fd concurrently... because 'send' and 'listen' run in different +# greenthread. +# So, a connection cannot be shared between thread/greenthread and +# this two variables permit to define the purpose of the connection +# to allow drivers to add special handling if needed (like heatbeat). +# amqp drivers create 3 kind of connections: +# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection +# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used +# * driver internally have another 'PURPOSE_LISTEN' connection dedicated +# to wait replies of rpc call +PURPOSE_LISTEN = 'listen' +PURPOSE_SEND = 'send' + class ConnectionPool(pool.Pool): """Class that implements a Pool of Connections.""" @@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool): self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while - def create(self): + def create(self, purpose=None): + if purpose is None: + purpose = PURPOSE_SEND LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url) + return self.connection_cls(self.conf, self.url, purpose) def empty(self): for item in self.iter_free(): @@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, pooled=True): + def __init__(self, connection_pool, purpose): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool + pooled = purpose == PURPOSE_SEND if pooled: self.connection = connection_pool.get() else: # a non-pooled connection is requested, so create a new connection - self.connection = connection_pool.create() + self.connection = connection_pool.create(purpose) self.pooled = pooled + self.connection.pooled = pooled def __enter__(self): """When with ConnectionContext() is used, return self.""" |