summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp53
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp2
-rw-r--r--qpid/cpp/src/tests/BrokerOptions.cpp79
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/acl.py64
12 files changed, 224 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index bd94582d10..ec3cf9d340 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -43,6 +43,8 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) :
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
- defaultMsgGroup("qpid.no-group")
+ defaultMsgGroup("qpid.no-group"),
+ timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) :
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
- ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
}
const std::string empty;
@@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) :
else
QPID_LOG(info, "Management not enabled");
+ // this feature affects performance, so let's be sure that gets logged!
+ if (conf.timestampRcvMsgs) {
+ QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+ }
+
/**
* SASL setup, can fail and terminate startup
*/
@@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
_qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
- status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+ status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+ status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type");
const std::string QUEUE_NAME("queue");
const std::string EXCHANGE_NAME("exchange");
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
const std::string _TRUE("true");
const std::string _FALSE("false");
}
@@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_OK;;
}
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+ }
+ receive = config.timestampRcvMsgs;
+ return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+ }
+ config.timestampRcvMsgs = receive;
+ QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+ return Manageable::STATUS_OK;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 8b347db3c0..b3b751be98 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -122,6 +122,7 @@ public:
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
+ bool timestampRcvMsgs;
private:
std::string getHome();
@@ -164,6 +165,10 @@ public:
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
+ Manageable::status_t getTimestampConfig(bool& receive,
+ const ConnectionState* context);
+ Manageable::status_t setTimestampConfig(const bool receive,
+ const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -315,6 +320,7 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+ bool isTimestamping() { return config.timestampRcvMsgs; }
typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 5ea7143366..d13109dad1 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+ time_t now = ::time(0);
+ props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
sys::Mutex::ScopedLock l(lock);
DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 2a23a25d06..dda45d73e6 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -81,7 +81,8 @@ public:
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
- QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ /** determine msg expiration time using the TTL value if present */
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
@@ -93,6 +94,8 @@ public:
QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
void setExchange(const std::string&);
void clearApplicationHeadersFlag();
+ /** set the timestamp delivery property to the current time-of-day */
+ QPID_BROKER_EXTERN void setTimestamp();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index dda481778d..380ec656cb 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -472,7 +472,7 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+ msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index ddd6ae3f5b..1ab17e9893 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
header.setEof(false);
msg->getFrames().append(header);
}
+ if (broker.isTimestamping())
+ msg->setTimestamp();
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
semanticState.handle(msg);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 5cf20c92eb..3badaf40ba 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject");
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, others?
+ // routing-key, timestamp, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
+ if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+ message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 50fdc82ee0..5799a1adca 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data,
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
+ msg->computeExpiration(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
msg->setIsManagementMessage(true);
diff --git a/qpid/cpp/src/tests/BrokerOptions.cpp b/qpid/cpp/src/tests/BrokerOptions.cpp
new file mode 100644
index 0000000000..b36d96916a
--- /dev/null
+++ b/qpid/cpp/src/tests/BrokerOptions.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Unit tests for various broker configuration options **/
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "MessagingFixture.h"
+
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace qpid;
+
+QPID_AUTO_TEST_CASE(testDisabledTimestamp)
+{
+ // by default, there should be no timestamp added by the broker
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("hi");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end());
+}
+
+QPID_AUTO_TEST_CASE(testEnabledTimestamp)
+{
+ // when enabled, the 0.10 timestamp is added by the broker
+ Broker::Options opts;
+ opts.timestampRcvMsgs = true;
+ MessagingFixture fix(opts, true);
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("one");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end());
+ BOOST_CHECK(props["x-amqp-0-10.timestamp"]);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 78ac6db5f1..f68a1462a9 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerOptions.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 7bf061ff54..1a701e8a8c 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
{
for (uint i = 0; i < count; i++) {
intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->setTimestamp(new broker::ExpiryPolicy);
+ m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 5e9a150d8f..65d5242e51 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -1030,6 +1030,64 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+ #=====================================
+ # ACL broker configuration tests
+ #=====================================
+
+ def test_broker_timestamp_config(self):
+ """
+ Test ACL control of the broker timestamp configuration
+ """
+ aclf = self.get_acl_file()
+ # enable lots of stuff to allow QMF to work
+ aclf.write('acl allow all create exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all publish exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ # this should let bob access the timestamp configuration
+ aclf.write('acl allow bob@QPID access broker\n')
+ aclf.write('acl allow admin@QPID all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ ts = None
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ ts = bob.get_timestamp_cfg() #should work
+ bob.set_timestamp_cfg(ts); #should work
+
+ obo = BrokerAdmin(self.config.broker, "obo", "obo")
+ try:
+ ts = obo.get_timestamp_cfg() #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ try:
+ obo.set_timestamp_cfg(ts) #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ ts = admin.get_timestamp_cfg() #should pass
+ admin.set_timestamp_cfg(ts) #should pass
+
+
class BrokerAdmin:
def __init__(self, broker, username=None, password=None):
self.connection = qpid.messaging.Connection(broker)
@@ -1075,3 +1133,9 @@ class BrokerAdmin:
def delete_queue(self, name):
self.invoke("delete", {"type": "queue", "name":name})
+
+ def get_timestamp_cfg(self):
+ return self.invoke("getTimestampConfig", {})
+
+ def set_timestamp_cfg(self, receive):
+ return self.invoke("getTimestampConfig", {"receive":receive})