summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp223
1 files changed, 57 insertions, 166 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2446c12f2b..8f751add9b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
@@ -45,13 +45,10 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/DtxAck.h"
-#include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -67,7 +64,6 @@
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
-#include <iterator>
#include <sstream>
namespace qpid {
@@ -86,20 +82,11 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
-// Name for header used to carry expiration information.
-const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
-// Headers used to flag headers/properties added by the UpdateClient so they can be
-// removed on the other side.
-const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
-const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
-
std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
return o << "cluster(" << c.updaterId << " UPDATER)";
}
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -133,7 +120,7 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
@@ -147,11 +134,13 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
+
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
session = connection.newSession(UPDATE);
- session.sync();
update();
done();
} catch (const std::exception& e) {
@@ -165,13 +154,6 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
- if(b.getExpiryPolicy()) {
- QPID_LOG(debug, *this << "Updating updatee with cluster time");
- qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
- int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
- ClusterConnectionProxy(session).clock(time);
- }
-
updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -181,20 +163,16 @@ void UpdateClient::update() {
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
-
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
-
- // some Queue Observers need session state & msgs synced first, so sync observers now
- b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
updateLinks();
updateManagementAgent();
- updateDtxManager();
- session.queueDelete(arg::queue=UPDATE);
session.close();
@@ -206,7 +184,7 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
-
+
// TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
@@ -298,7 +276,7 @@ class MessageUpdater {
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
-
+
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -315,6 +293,7 @@ class MessageUpdater {
}
}
+
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
@@ -323,23 +302,10 @@ class MessageUpdater {
}
lastPos = message.position;
- // if the ttl > 0, we need to send the calculated expiration time to the updatee
- const DeliveryProperties* dprops =
- message.payload->getProperties<DeliveryProperties>();
- if (dprops && dprops->getTtl() > 0) {
- bool hadMessageProps =
- message.payload->hasProperties<framing::MessageProperties>();
- const framing::MessageProperties* mprops =
- message.payload->getProperties<framing::MessageProperties>();
- bool hadApplicationHeaders = mprops->hasApplicationHeaders();
- message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
- sys::Duration(sys::EPOCH, message.payload->getExpiration()));
- // If message properties or application headers didn't exist
- // prior to us adding data, we want to remove them on the other side.
- if (!hadMessageProps)
- message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
- else if (!hadApplicationHeaders)
- message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+ ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
}
// We can't send a broker::Message via the normal client API,
@@ -352,7 +318,7 @@ class MessageUpdater {
framing::MessageTransferBody transfer(
*message.payload->getFrames().as<framing::MessageTransferBody>());
transfer.setDestination(UpdateClient::UPDATE);
-
+
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
@@ -360,10 +326,9 @@ class MessageUpdater {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
- morecontent = message.payload->getContentFrame(
- *(message.queue), frame, maxContentSize, offset);
+ morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
@@ -392,8 +357,6 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
-
- ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -409,11 +372,7 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue
}
void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
- if (binding.exchange.size())
- s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
- //else its the default exchange and there is no need to replicate
- //the binding, the creation of the queue will have done so
- //automatically
+ s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
@@ -421,8 +380,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
- QPID_LOG(debug, *this << " updating output task " << ci->getTag()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
@@ -430,7 +389,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+
// Send the management ID first on the main connection.
std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -467,7 +426,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating session " << ss->getId());
- // Create a client session to update session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
simpl->disableAutoDetach();
@@ -486,19 +445,19 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
- boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
+ boost::bind(&UpdateClient::updateUnacked, this, _1));
- updateTransactionState(ss->getSemanticState());
+ updateTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
+ if (inProgress)
--received;
// Sync the session to ensure all responses from broker have been processed.
shadowSession.sync();
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -507,8 +466,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete(),
- ss->getSemanticState().getDtxSelected()
+ ss->receiverGetIncomplete()
);
// Send frames for partial message in progress.
@@ -521,13 +479,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getTag(),
+ arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -535,32 +493,29 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getTag(),
+ ci->getName(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
-
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
- client::AsyncSession& updateSession)
-{
- if (!dr.isEnded() && dr.isAcquired()) {
- assert(dr.getMessage().payload);
+
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
+ if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
- ClusterConnectionProxy(updateSession).deliveryRecord(
+ ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
dr.getTag(),
@@ -581,12 +536,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
: MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
- void operator()(const broker::DtxAck& ack) {
- std::for_each(ack.getPending().begin(), ack.getPending().end(),
- boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
- proxy.dtxAck();
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
-
+
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -601,18 +554,13 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
proxy.txAccept(txAccept.getAcked());
}
- typedef std::list<Queue::shared_ptr> QueueList;
-
- void copy(const QueueList& l, Array& a) {
- for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
- a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
- }
-
void operator()(const broker::TxPublish& txPub) {
updateMessage(txPub.getMessage());
- assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
- copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
@@ -621,44 +569,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-
-void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
-{
- ClusterConnectionProxy proxy(shadowSession);
- broker::DtxWorkRecord* record =
- updaterBroker.getDtxManager().getWork(dtx->getXid());
- proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
-
-}
-
-void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+
+void UpdateClient::updateTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
- broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
- broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
- if (dtx) {
- updateBufferRef(dtx, false); // Current transaction.
- } else if (tx) {
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
- tx->accept(updater);
+ txBuffer->accept(updater);
proxy.txEnd();
}
- for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
- i != s.getSuspendedXids().end();
- ++i)
- {
- updateBufferRef(i->second, true);
- }
-}
-
-void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
- ClusterConnectionProxy proxy(session);
- proxy.dtxStart(
- dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
- TxOpUpdater updater(*this, session, expiry);
- dtx->accept(updater);
- proxy.dtxEnd();
}
void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
@@ -693,35 +615,4 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
ClusterConnectionProxy(session).config(encode(*bridge));
}
-void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
-{
- q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
-}
-
-void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
- boost::shared_ptr<broker::QueueObserver> o)
-{
- qpid::framing::FieldTable state;
- broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (so) {
- so->getState( state );
- std::string id(so->getId());
- QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
- ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
- }
-}
-
-void UpdateClient::updateDtxManager() {
- broker::DtxManager& dtm = updaterBroker.getDtxManager();
- dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
-}
-
-void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
- QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
- for (size_t i = 0; i < r.size(); ++i)
- updateDtxBuffer(r[i]);
- ClusterConnectionProxy(session).dtxWorkRecord(
- r.getXid(), r.isPrepared(), r.getTimeout());
-}
-
}} // namespace qpid::cluster