summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp91
1 files changed, 63 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index a15c14ff48..77448789db 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
@@ -83,11 +83,20 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
+// Name for header used to carry expiration information.
+const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
+// Headers used to flag headers/properties added by the UpdateClient so they can be
+// removed on the other side.
+const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
+const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
+
std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
return o << "cluster(" << c.updaterId << " UPDATER)";
}
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -121,7 +130,7 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
@@ -135,9 +144,6 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.cluster-update");
-
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
@@ -155,6 +161,13 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
+ if(b.getExpiryPolicy()) {
+ QPID_LOG(debug, *this << "Updating updatee with cluster time");
+ qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
+ int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
+ ClusterConnectionProxy(session).clock(time);
+ }
+
updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -174,7 +187,6 @@ void UpdateClient::update() {
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
- ClusterConnectionProxy(session).expiryId(expiry.getId());
updateLinks();
updateManagementAgent();
@@ -188,7 +200,7 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
-
+
// TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
@@ -280,7 +292,7 @@ class MessageUpdater {
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
-
+
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -297,7 +309,6 @@ class MessageUpdater {
}
}
-
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
@@ -306,10 +317,23 @@ class MessageUpdater {
}
lastPos = message.position;
- // Send the expiry ID if necessary.
- if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
- boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
- ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
+ // if the ttl > 0, we need to send the calculated expiration time to the updatee
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) {
+ bool hadMessageProps =
+ message.payload->hasProperties<framing::MessageProperties>();
+ framing::MessageProperties* mprops =
+ message.payload->getProperties<framing::MessageProperties>();
+ bool hadApplicationHeaders = mprops->hasApplicationHeaders();
+ FieldTable& applicationHeaders = mprops->getApplicationHeaders();
+ applicationHeaders.setInt64(
+ UpdateClient::X_QPID_EXPIRATION,
+ sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+ // If message properties or application headers didn't exist
+ // prior to us adding data, we want to remove them on the other side.
+ if (!hadMessageProps)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+ else if (!hadApplicationHeaders)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
}
// We can't send a broker::Message via the normal client API,
@@ -322,7 +346,7 @@ class MessageUpdater {
framing::MessageTransferBody transfer(
*message.payload->getFrames().as<framing::MessageTransferBody>());
transfer.setDestination(UpdateClient::UPDATE);
-
+
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
@@ -330,12 +354,21 @@ class MessageUpdater {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
+ // If the ttl > 0, we need to send the calculated expiration time to the updatee
+ // Careful not to alter the message as a side effect e.g. by adding
+ // an empty DeliveryProperties or setting TTL when it wasn't set before.
+ uint64_t ttl = 0;
+ if (message.payload->hasProperties<DeliveryProperties>()) {
+ DeliveryProperties* dprops =
+ message.payload->getProperties<DeliveryProperties>();
+ if (dprops->hasTtl()) ttl = dprops->getTtl();
+ };
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
@@ -361,6 +394,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
+
+ ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -393,7 +428,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+
// Send the management ID first on the main connection.
std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -430,7 +465,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating session " << ss->getId());
- // Create a client session to update session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
simpl->disableAutoDetach();
@@ -456,12 +491,12 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
+ if (inProgress)
--received;
// Sync the session to ensure all responses from broker have been processed.
shadowSession.sync();
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -511,7 +546,7 @@ void UpdateClient::updateConsumer(
QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
-
+
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
@@ -543,7 +578,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
-
+
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -563,7 +598,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
typedef std::list<Queue::shared_ptr> QueueList;
const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
@@ -573,7 +608,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-
+
void UpdateClient::updateTxState(broker::SemanticState& s) {
QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);