summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateExchange.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
committerAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
commit4cdf746f5bb38db60821047c3393f89f15b26f1e (patch)
tree610a404288b464a2225668c128fa77a84020ea62 /cpp/src/qpid/cluster/UpdateExchange.cpp
parent8034affaba71c0d991bfe1fff5de537f73d0f404 (diff)
downloadqpid-python-4cdf746f5bb38db60821047c3393f89f15b26f1e.tar.gz
QPID-3280: Performance problem with TTL messages.
When sending a large number of messages with nonzero TTLs to a cluster, overall message throughput drops by around 20-30% compared to messages with TTL 0. The previous approach to TTL in the cluster is replaced with a simpler "cluster clock". Also QueueCleaner is executed in the cluster timer, and modified to be deterministic in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136170 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateExchange.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.cpp27
1 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp
index 11937f296f..e830459aba 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -27,6 +28,8 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
+using framing::MessageProperties;
+using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent)
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-}
+ // Copy expiration from x-property if present.
+ if (msg->hasProperties<MessageProperties>()) {
+ MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ if (mprops->hasApplicationHeaders()) {
+ FieldTable& headers = mprops->getApplicationHeaders();
+ if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
+ msg->setExpiration(
+ sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
+ headers.erase(UpdateClient::X_QPID_EXPIRATION);
+ // Erase props/headers that were added by the UpdateClient
+ if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
+ msg->eraseProperties<MessageProperties>();
+ else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
+ mprops->clearApplicationHeadersFlag();
+ }
+ }
+ }
+}
}} // namespace qpid::cluster