diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-12 17:42:27 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-12 17:42:27 +0000 |
commit | 23a0d956ffa79aa3e3fbf43e3755f1cea387b562 (patch) | |
tree | 34ff95f37feb436b970715bd89975747075e07b2 | |
parent | f821fb7fae3c74d8662e7783b255d52c785961f5 (diff) | |
download | qpid-python-23a0d956ffa79aa3e3fbf43e3755f1cea387b562.tar.gz |
QPID-3417: C++ broker - support adding arrival timestamp to received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182490 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/BrokerOptions.cpp | 79 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 2 | ||||
-rwxr-xr-x | cpp/src/tests/acl.py | 64 | ||||
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java | 13 | ||||
-rw-r--r-- | specs/management-schema.xml | 8 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/management.py | 61 |
15 files changed, 305 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bd94582d10..ec3cf9d340 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -43,6 +43,8 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) : queueFlowStopRatio(80), queueFlowResumeRatio(70), queueThresholdEventRatio(80), - defaultMsgGroup("qpid.no-group") + defaultMsgGroup("qpid.no-group"), + timestampRcvMsgs(false) // set the 0.10 timestamp delivery property { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) : ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") - ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") + ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message."); } const std::string empty; @@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) : else QPID_LOG(info, "Management not enabled"); + // this feature affects performance, so let's be sure that gets logged! + if (conf.timestampRcvMsgs) { + QPID_LOG(notice, "Receive message timestamping is ENABLED."); + } + /** * SASL setup, can fail and terminate startup */ @@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); - status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args); + status = getTimestampConfig(a.o_receive, getManagementExecutionContext()); + break; + } + case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args); + status = setTimestampConfig(a.i_receive, getManagementExecutionContext()); + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type"); const std::string QUEUE_NAME("queue"); const std::string EXCHANGE_NAME("exchange"); +const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); + const std::string _TRUE("true"); const std::string _FALSE("false"); } @@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name, return Manageable::STATUS_OK;; } +Manageable::status_t Broker::getTimestampConfig(bool& receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId)); + } + receive = config.timestampRcvMsgs; + return Manageable::STATUS_OK; +} + +Manageable::status_t Broker::setTimestampConfig(const bool receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId)); + } + config.timestampRcvMsgs = receive; + QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED.")); + return Manageable::STATUS_OK; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8b347db3c0..b3b751be98 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -122,6 +122,7 @@ public: uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; std::string defaultMsgGroup; + bool timestampRcvMsgs; private: std::string getHome(); @@ -164,6 +165,10 @@ public: const std::string& userId, const std::string& connectionId, qpid::types::Variant::Map& results); + Manageable::status_t getTimestampConfig(bool& receive, + const ConnectionState* context); + Manageable::status_t setTimestampConfig(const bool receive, + const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -315,6 +320,7 @@ public: const boost::intrusive_ptr<Message>& msg)> deferDelivery; bool isAuthenticating ( ) { return config.auth; } + bool isTimestamping() { return config.timestampRcvMsgs; } typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 5ea7143366..d13109dad1 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp() +{ + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); + time_t now = ::time(0); + props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch +} + +void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 2a23a25d06..dda45d73e6 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -81,7 +81,8 @@ public: QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); - QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + /** determine msg expiration time using the TTL value if present */ + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } @@ -93,6 +94,8 @@ public: QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key); void setExchange(const std::string&); void clearApplicationHeadersFlag(); + /** set the timestamp delivery property to the current time-of-day */ + QPID_BROKER_EXTERN void setTimestamp(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index dda481778d..380ec656cb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -472,7 +472,7 @@ const std::string nullstring; } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index ddd6ae3f5b..1ab17e9893 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) header.setEof(false); msg->getFrames().append(header); } + if (broker.isTimestamping()) + msg->setTimestamp(); msg->setPublisher(&getConnection()); msg->getIngressCompletion().begin(); semanticState.handle(msg); diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 5cf20c92eb..3badaf40ba 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, @@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } - // routing-key, others? + // routing-key, timestamp, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } + if (deliveryProperties && deliveryProperties->hasTimestamp()) { + message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); + } } } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 50fdc82ee0..5799a1adca 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data, dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->setTimestamp(broker->getExpiryPolicy()); + msg->computeExpiration(broker->getExpiryPolicy()); } msg->getFrames().append(content); msg->setIsManagementMessage(true); diff --git a/cpp/src/tests/BrokerOptions.cpp b/cpp/src/tests/BrokerOptions.cpp new file mode 100644 index 0000000000..b36d96916a --- /dev/null +++ b/cpp/src/tests/BrokerOptions.cpp @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Unit tests for various broker configuration options **/ + +#include "unit_test.h" +#include "test_tools.h" +#include "MessagingFixture.h" + +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite) + +using namespace qpid::broker; +using namespace qpid::messaging; +using namespace qpid::types; +using namespace qpid; + +QPID_AUTO_TEST_CASE(testDisabledTimestamp) +{ + // by default, there should be no timestamp added by the broker + MessagingFixture fix; + + Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}"); + messaging::Message msg("hi"); + sender.send(msg); + + Receiver receiver = fix.session.createReceiver("test-q"); + messaging::Message in; + BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE)); + Variant::Map props = in.getProperties(); + BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end()); +} + +QPID_AUTO_TEST_CASE(testEnabledTimestamp) +{ + // when enabled, the 0.10 timestamp is added by the broker + Broker::Options opts; + opts.timestampRcvMsgs = true; + MessagingFixture fix(opts, true); + + Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}"); + messaging::Message msg("one"); + sender.send(msg); + + Receiver receiver = fix.session.createReceiver("test-q"); + messaging::Message in; + BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE)); + Variant::Map props = in.getProperties(); + BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end()); + BOOST_CHECK(props["x-amqp-0-10.timestamp"]); +} + +QPID_AUTO_TEST_SUITE_END() + +}} diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 78ac6db5f1..f68a1462a9 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerOptions.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7bf061ff54..1a701e8a8c 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt { for (uint i = 0; i < count; i++) { intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); - m->setTimestamp(new broker::ExpiryPolicy); + m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py index 5e9a150d8f..65d5242e51 100755 --- a/cpp/src/tests/acl.py +++ b/cpp/src/tests/acl.py @@ -1030,6 +1030,64 @@ class ACLTests(TestBase010): if (403 == e.args[0].error_code): self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1"); + #===================================== + # ACL broker configuration tests + #===================================== + + def test_broker_timestamp_config(self): + """ + Test ACL control of the broker timestamp configuration + """ + aclf = self.get_acl_file() + # enable lots of stuff to allow QMF to work + aclf.write('acl allow all create exchange\n') + aclf.write('acl allow all access exchange\n') + aclf.write('acl allow all bind exchange\n') + aclf.write('acl allow all publish exchange\n') + aclf.write('acl allow all create queue\n') + aclf.write('acl allow all access queue\n') + aclf.write('acl allow all delete queue\n') + aclf.write('acl allow all consume queue\n') + aclf.write('acl allow all access method\n') + # this should let bob access the timestamp configuration + aclf.write('acl allow bob@QPID access broker\n') + aclf.write('acl allow admin@QPID all all\n') + aclf.write('acl deny all all') + aclf.close() + + result = self.reload_acl() + if (result.text.find("format error",0,len(result.text)) != -1): + self.fail(result) + + ts = None + bob = BrokerAdmin(self.config.broker, "bob", "bob") + ts = bob.get_timestamp_cfg() #should work + bob.set_timestamp_cfg(ts); #should work + + obo = BrokerAdmin(self.config.broker, "obo", "obo") + try: + ts = obo.get_timestamp_cfg() #should fail + failed = False + except Exception, e: + failed = True + self.assertEqual(7,e.args[0]["error_code"]) + assert e.args[0]["error_text"].find("unauthorized-access") == 0 + assert(failed) + + try: + obo.set_timestamp_cfg(ts) #should fail + failed = False + except Exception, e: + failed = True + self.assertEqual(7,e.args[0]["error_code"]) + assert e.args[0]["error_text"].find("unauthorized-access") == 0 + assert(failed) + + admin = BrokerAdmin(self.config.broker, "admin", "admin") + ts = admin.get_timestamp_cfg() #should pass + admin.set_timestamp_cfg(ts) #should pass + + class BrokerAdmin: def __init__(self, broker, username=None, password=None): self.connection = qpid.messaging.Connection(broker) @@ -1075,3 +1133,9 @@ class BrokerAdmin: def delete_queue(self, name): self.invoke("delete", {"type": "queue", "name":name}) + + def get_timestamp_cfg(self): + return self.invoke("getTimestampConfig", {}) + + def set_timestamp_cfg(self, receive): + return self.invoke("getTimestampConfig", {"receive":receive}) diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 171f4afbe2..6abef6fd6b 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -713,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory, + final java.lang.Boolean receive) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, final String type, final String name, diff --git a/specs/management-schema.xml b/specs/management-schema.xml index dd4acf66d5..9e2a644c2a 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -103,6 +103,14 @@ <arg name="level" dir="O" type="sstr"/> </method> + <method name="getTimestampConfig" desc="Get the message timestamping configuration"> + <arg name="receive" dir="O" type="bool" desc="True if received messages are timestamped."/> + </method> + + <method name="setTimestampConfig" desc="Set the message timestamping configuration"> + <arg name="receive" dir="I" type="bool" desc="Set true to enable timestamping received messages."/> + </method> + <method name="create" desc="Create an object of the specified type"> <arg name="type" dir="I" type="sstr" desc="The type of object to create"/> <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py index 5aaa1a7c7d..ac6d7578da 100644 --- a/tests/src/py/qpid_tests/broker_0_10/management.py +++ b/tests/src/py/qpid_tests/broker_0_10/management.py @@ -584,4 +584,63 @@ class ManagementTest (TestBase010): conn_qmf.update() self.assertEqual(conn_qmf.msgsToClient, 1) - + def test_timestamp_config(self): + """ + Test message timestamping control. + """ + self.startQmf() + conn = self.connect() + session = conn.session("timestamp-session") + + #verify that receive message timestamping is OFF by default + broker = self.qmf.getObjects(_class="broker")[0] + rc = broker.getTimestampConfig() + self.assertEqual(rc.status, 0) + self.assertEqual(rc.text, "OK") + #self.assertEqual(rc.receive, False) + + #try to enable it + rc = broker.setTimestampConfig(True) + self.assertEqual(rc.status, 0) + self.assertEqual(rc.text, "OK") + + rc = broker.getTimestampConfig() + self.assertEqual(rc.status, 0) + self.assertEqual(rc.text, "OK") + self.assertEqual(rc.receive, True) + + #send a message to a queue + session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc")) + + #receive message from queue, and verify timestamp is present + session.message_subscribe(destination="d", queue="ts-q") + session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + incoming = session.incoming("d") + msg = incoming.get(timeout=1) + self.assertEqual("abc", msg.body) + self.assertEqual(msg.has("delivery_properties"), True) + dp = msg.get("delivery_properties") + assert(dp.timestamp) + + #try to disable it + rc = broker.setTimestampConfig(False) + self.assertEqual(rc.status, 0) + self.assertEqual(rc.text, "OK") + + rc = broker.getTimestampConfig() + self.assertEqual(rc.status, 0) + self.assertEqual(rc.text, "OK") + self.assertEqual(rc.receive, False) + + #send another message to the queue + session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def")) + + #receive message from queue, and verify timestamp is NOT PRESENT + msg = incoming.get(timeout=1) + self.assertEqual("def", msg.body) + self.assertEqual(msg.has("delivery_properties"), True) + dp = msg.get("delivery_properties") + self.assertEqual(dp.timestamp, None) + |