summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-12 17:42:27 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-12 17:42:27 +0000
commit23a0d956ffa79aa3e3fbf43e3755f1cea387b562 (patch)
tree34ff95f37feb436b970715bd89975747075e07b2
parentf821fb7fae3c74d8662e7783b255d52c785961f5 (diff)
downloadqpid-python-23a0d956ffa79aa3e3fbf43e3755f1cea387b562.tar.gz
QPID-3417: C++ broker - support adding arrival timestamp to received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182490 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Broker.cpp53
-rw-r--r--cpp/src/qpid/broker/Broker.h6
-rw-r--r--cpp/src/qpid/broker/Message.cpp10
-rw-r--r--cpp/src/qpid/broker/Message.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp6
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp2
-rw-r--r--cpp/src/tests/BrokerOptions.cpp79
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/QueueTest.cpp2
-rwxr-xr-xcpp/src/tests/acl.py64
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java13
-rw-r--r--specs/management-schema.xml8
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/management.py61
15 files changed, 305 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index bd94582d10..ec3cf9d340 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -43,6 +43,8 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) :
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
- defaultMsgGroup("qpid.no-group")
+ defaultMsgGroup("qpid.no-group"),
+ timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) :
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
- ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
}
const std::string empty;
@@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) :
else
QPID_LOG(info, "Management not enabled");
+ // this feature affects performance, so let's be sure that gets logged!
+ if (conf.timestampRcvMsgs) {
+ QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+ }
+
/**
* SASL setup, can fail and terminate startup
*/
@@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
_qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
- status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+ status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+ status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type");
const std::string QUEUE_NAME("queue");
const std::string EXCHANGE_NAME("exchange");
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
const std::string _TRUE("true");
const std::string _FALSE("false");
}
@@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_OK;;
}
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+ }
+ receive = config.timestampRcvMsgs;
+ return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+ }
+ config.timestampRcvMsgs = receive;
+ QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+ return Manageable::STATUS_OK;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8b347db3c0..b3b751be98 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -122,6 +122,7 @@ public:
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
+ bool timestampRcvMsgs;
private:
std::string getHome();
@@ -164,6 +165,10 @@ public:
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
+ Manageable::status_t getTimestampConfig(bool& receive,
+ const ConnectionState* context);
+ Manageable::status_t setTimestampConfig(const bool receive,
+ const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -315,6 +320,7 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+ bool isTimestamping() { return config.timestampRcvMsgs; }
typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 5ea7143366..d13109dad1 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+ time_t now = ::time(0);
+ props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
sys::Mutex::ScopedLock l(lock);
DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 2a23a25d06..dda45d73e6 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -81,7 +81,8 @@ public:
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
- QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ /** determine msg expiration time using the TTL value if present */
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
@@ -93,6 +94,8 @@ public:
QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
void setExchange(const std::string&);
void clearApplicationHeadersFlag();
+ /** set the timestamp delivery property to the current time-of-day */
+ QPID_BROKER_EXTERN void setTimestamp();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index dda481778d..380ec656cb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -472,7 +472,7 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+ msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index ddd6ae3f5b..1ab17e9893 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
header.setEof(false);
msg->getFrames().append(header);
}
+ if (broker.isTimestamping())
+ msg->setTimestamp();
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
semanticState.handle(msg);
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 5cf20c92eb..3badaf40ba 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject");
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, others?
+ // routing-key, timestamp, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
+ if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+ message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+ }
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 50fdc82ee0..5799a1adca 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data,
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
+ msg->computeExpiration(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
msg->setIsManagementMessage(true);
diff --git a/cpp/src/tests/BrokerOptions.cpp b/cpp/src/tests/BrokerOptions.cpp
new file mode 100644
index 0000000000..b36d96916a
--- /dev/null
+++ b/cpp/src/tests/BrokerOptions.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Unit tests for various broker configuration options **/
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "MessagingFixture.h"
+
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace qpid;
+
+QPID_AUTO_TEST_CASE(testDisabledTimestamp)
+{
+ // by default, there should be no timestamp added by the broker
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("hi");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end());
+}
+
+QPID_AUTO_TEST_CASE(testEnabledTimestamp)
+{
+ // when enabled, the 0.10 timestamp is added by the broker
+ Broker::Options opts;
+ opts.timestampRcvMsgs = true;
+ MessagingFixture fix(opts, true);
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("one");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end());
+ BOOST_CHECK(props["x-amqp-0-10.timestamp"]);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 78ac6db5f1..f68a1462a9 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerOptions.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 7bf061ff54..1a701e8a8c 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
{
for (uint i = 0; i < count; i++) {
intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->setTimestamp(new broker::ExpiryPolicy);
+ m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index 5e9a150d8f..65d5242e51 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -1030,6 +1030,64 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+ #=====================================
+ # ACL broker configuration tests
+ #=====================================
+
+ def test_broker_timestamp_config(self):
+ """
+ Test ACL control of the broker timestamp configuration
+ """
+ aclf = self.get_acl_file()
+ # enable lots of stuff to allow QMF to work
+ aclf.write('acl allow all create exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all publish exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ # this should let bob access the timestamp configuration
+ aclf.write('acl allow bob@QPID access broker\n')
+ aclf.write('acl allow admin@QPID all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ ts = None
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ ts = bob.get_timestamp_cfg() #should work
+ bob.set_timestamp_cfg(ts); #should work
+
+ obo = BrokerAdmin(self.config.broker, "obo", "obo")
+ try:
+ ts = obo.get_timestamp_cfg() #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ try:
+ obo.set_timestamp_cfg(ts) #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ ts = admin.get_timestamp_cfg() #should pass
+ admin.set_timestamp_cfg(ts) #should pass
+
+
class BrokerAdmin:
def __init__(self, broker, username=None, password=None):
self.connection = qpid.messaging.Connection(broker)
@@ -1075,3 +1133,9 @@ class BrokerAdmin:
def delete_queue(self, name):
self.invoke("delete", {"type": "queue", "name":name})
+
+ def get_timestamp_cfg(self):
+ return self.invoke("getTimestampConfig", {})
+
+ def set_timestamp_cfg(self, receive):
+ return self.invoke("getTimestampConfig", {"receive":receive})
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 171f4afbe2..6abef6fd6b 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -713,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+ final java.lang.Boolean receive)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
final String type,
final String name,
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index dd4acf66d5..9e2a644c2a 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -103,6 +103,14 @@
<arg name="level" dir="O" type="sstr"/>
</method>
+ <method name="getTimestampConfig" desc="Get the message timestamping configuration">
+ <arg name="receive" dir="O" type="bool" desc="True if received messages are timestamped."/>
+ </method>
+
+ <method name="setTimestampConfig" desc="Set the message timestamping configuration">
+ <arg name="receive" dir="I" type="bool" desc="Set true to enable timestamping received messages."/>
+ </method>
+
<method name="create" desc="Create an object of the specified type">
<arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
<arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py
index 5aaa1a7c7d..ac6d7578da 100644
--- a/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -584,4 +584,63 @@ class ManagementTest (TestBase010):
conn_qmf.update()
self.assertEqual(conn_qmf.msgsToClient, 1)
-
+ def test_timestamp_config(self):
+ """
+ Test message timestamping control.
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("timestamp-session")
+
+ #verify that receive message timestamping is OFF by default
+ broker = self.qmf.getObjects(_class="broker")[0]
+ rc = broker.getTimestampConfig()
+ self.assertEqual(rc.status, 0)
+ self.assertEqual(rc.text, "OK")
+ #self.assertEqual(rc.receive, False)
+
+ #try to enable it
+ rc = broker.setTimestampConfig(True)
+ self.assertEqual(rc.status, 0)
+ self.assertEqual(rc.text, "OK")
+
+ rc = broker.getTimestampConfig()
+ self.assertEqual(rc.status, 0)
+ self.assertEqual(rc.text, "OK")
+ self.assertEqual(rc.receive, True)
+
+ #send a message to a queue
+ session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc"))
+
+ #receive message from queue, and verify timestamp is present
+ session.message_subscribe(destination="d", queue="ts-q")
+ session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ incoming = session.incoming("d")
+ msg = incoming.get(timeout=1)
+ self.assertEqual("abc", msg.body)
+ self.assertEqual(msg.has("delivery_properties"), True)
+ dp = msg.get("delivery_properties")
+ assert(dp.timestamp)
+
+ #try to disable it
+ rc = broker.setTimestampConfig(False)
+ self.assertEqual(rc.status, 0)
+ self.assertEqual(rc.text, "OK")
+
+ rc = broker.getTimestampConfig()
+ self.assertEqual(rc.status, 0)
+ self.assertEqual(rc.text, "OK")
+ self.assertEqual(rc.receive, False)
+
+ #send another message to the queue
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def"))
+
+ #receive message from queue, and verify timestamp is NOT PRESENT
+ msg = incoming.get(timeout=1)
+ self.assertEqual("def", msg.body)
+ self.assertEqual(msg.has("delivery_properties"), True)
+ dp = msg.get("delivery_properties")
+ self.assertEqual(dp.timestamp, None)
+