diff options
author | Alan Conway <aconway@apache.org> | 2011-06-15 20:15:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-06-15 20:15:51 +0000 |
commit | 4cdf746f5bb38db60821047c3393f89f15b26f1e (patch) | |
tree | 610a404288b464a2225668c128fa77a84020ea62 /cpp/src/qpid/cluster/UpdateExchange.cpp | |
parent | 8034affaba71c0d991bfe1fff5de537f73d0f404 (diff) | |
download | qpid-python-4cdf746f5bb38db60821047c3393f89f15b26f1e.tar.gz |
QPID-3280: Performance problem with TTL messages.
When sending a large number of messages with nonzero TTLs to a
cluster, overall message throughput drops by around 20-30% compared to
messages with TTL 0.
The previous approach to TTL in the cluster is replaced with a simpler
"cluster clock". Also QueueCleaner is executed in the cluster timer,
and modified to be deterministic in a cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136170 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateExchange.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.cpp | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp index 11937f296f..e830459aba 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ * */ #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" #include "UpdateExchange.h" @@ -27,6 +28,8 @@ namespace cluster { using framing::MessageTransferBody; using framing::DeliveryProperties; +using framing::MessageProperties; +using framing::FieldTable; UpdateExchange::UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), @@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent) void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + // Copy exchange name to destination property. MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); assert(transfer); const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); @@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& transfer->setDestination(props->getExchange()); else transfer->clearDestinationFlag(); -} + // Copy expiration from x-property if present. + if (msg->hasProperties<MessageProperties>()) { + MessageProperties* mprops = msg->getProperties<MessageProperties>(); + if (mprops->hasApplicationHeaders()) { + FieldTable& headers = mprops->getApplicationHeaders(); + if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { + msg->setExpiration( + sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); + headers.erase(UpdateClient::X_QPID_EXPIRATION); + // Erase props/headers that were added by the UpdateClient + if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) + msg->eraseProperties<MessageProperties>(); + else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) + mprops->clearApplicationHeadersFlag(); + } + } + } +} }} // namespace qpid::cluster |