diff options
author | Zuul <zuul@review.openstack.org> | 2018-05-01 16:49:07 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2018-05-01 16:49:07 +0000 |
commit | 4d03b163348094be684ea68f24c636a673d54aa2 (patch) | |
tree | 35ebeed6f153215382c551246f09fd5c7b08f48e | |
parent | 2b55d6c1609ff7b5dfe44e7fce4c5e507ef4915e (diff) | |
parent | 930e6189e25425576ed3a2f570bda3fd9d310bd6 (diff) | |
download | oslo-messaging-4d03b163348094be684ea68f24c636a673d54aa2.tar.gz |
Merge "Add heartbeat() method to RpcIncomingMessage"
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 3 | ||||
-rw-r--r-- | oslo_messaging/_drivers/base.py | 13 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_amqp1.py | 3 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_fake.py | 3 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 3 | ||||
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py | 3 |
6 files changed, 28 insertions, 0 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 8fc72db..abda9b0 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -178,6 +178,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # the end. self._message_operations_handler.do(self.message.requeue) + def heartbeat(self): + LOG.debug("Message heartbeat not implemented") + class ObsoleteReplyQueuesCache(object): """Cache of reply queue id that doesn't exists anymore. diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index c09ab6f..f023441 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -154,6 +154,19 @@ class RpcIncomingMessage(IncomingMessage): :raises: Does not raise an exception """ + @abc.abstractmethod + def heartbeat(self): + """Called by the server to send an RPC heartbeat message back to + the calling client. + + If the client (is new enough to have) passed its timeout value during + the RPC call, this method will be called periodically by the server + to update the client's timeout timer while a long-running call is + executing. + + :raises: Does not raise an exception + """ + @six.add_metaclass(abc.ABCMeta) class PollStyleListener(object): diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 5c1973d..5714a61 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -98,6 +98,9 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): self._correlation_id = message.id self._disposition = disposition + def heartbeat(self): + LOG.debug("Message heartbeat not implemented") + def reply(self, reply=None, failure=None): """Schedule an RPCReplyTask to send the reply.""" if self._reply_to: diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index 6898350..fd66133 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -38,6 +38,9 @@ class FakeIncomingMessage(base.RpcIncomingMessage): def requeue(self): self.requeue_callback() + def heartbeat(self): + """Heartbeat is not supported.""" + class FakeListener(base.PollStyleListener): diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index b292c46..184c65b 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -315,6 +315,9 @@ class OsloKafkaMessage(base.RpcIncomingMessage): def reply(self, reply=None, failure=None): LOG.warning(_LW("reply is not supported")) + def heartbeat(self): + LOG.warning(_LW("heartbeat is not supported")) + class KafkaListener(base.PollStyleListener): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index a7ddd09..9810388 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -36,3 +36,6 @@ class ZmqIncomingMessage(base.RpcIncomingMessage): def requeue(self): """Requeue is not supported.""" + + def heartbeat(self): + """Heartbeat is not supported.""" |