diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 118 |
1 files changed, 89 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index f4784297a7..2446c12f2b 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -45,6 +45,8 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" +#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" @@ -65,6 +67,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <iterator> #include <sstream> namespace qpid { @@ -148,6 +151,7 @@ void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); session = connection.newSession(UPDATE); + session.sync(); update(); done(); } catch (const std::exception& e) { @@ -177,9 +181,9 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); // some Queue Observers need session state & msgs synced first, so sync observers now b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); @@ -189,6 +193,8 @@ void UpdateClient::update() { updateLinks(); updateManagementAgent(); + updateDtxManager(); + session.queueDelete(arg::queue=UPDATE); session.close(); @@ -318,22 +324,22 @@ class MessageUpdater { lastPos = message.position; // if the ttl > 0, we need to send the calculated expiration time to the updatee - if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) { + const DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops && dprops->getTtl() > 0) { bool hadMessageProps = message.payload->hasProperties<framing::MessageProperties>(); - framing::MessageProperties* mprops = + const framing::MessageProperties* mprops = message.payload->getProperties<framing::MessageProperties>(); bool hadApplicationHeaders = mprops->hasApplicationHeaders(); - FieldTable& applicationHeaders = mprops->getApplicationHeaders(); - applicationHeaders.setInt64( - UpdateClient::X_QPID_EXPIRATION, - sys::Duration(sys::EPOCH, message.payload->getExpiration())); + message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); // If message properties or application headers didn't exist // prior to us adding data, we want to remove them on the other side. if (!hadMessageProps) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); else if (!hadApplicationHeaders) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, @@ -356,7 +362,8 @@ class MessageUpdater { for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame( + *(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -402,7 +409,11 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue } void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) { - s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + if (binding.exchange.size()) + s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + //else its the default exchange and there is no need to replicate + //the binding, the creation of the queue will have done so + //automatically } void UpdateClient::updateOutputTask(const sys::OutputTask* task) { @@ -475,9 +486,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); + boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); - updateTxState(ss->getSemanticState()); // Tx transaction state. + updateTransactionState(ss->getSemanticState()); // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -496,7 +507,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -538,14 +550,17 @@ void UpdateClient::updateConsumer( << " on " << shadowSession.getId()); } -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, + client::AsyncSession& updateSession) +{ + if (!dr.isEnded() && dr.isAcquired()) { + assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(shadowSession).deliveryRecord( + ClusterConnectionProxy(updateSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -566,8 +581,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); + void operator()(const broker::DtxAck& ack) { + std::for_each(ack.getPending().begin(), ack.getPending().end(), + boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); + proxy.dtxAck(); } void operator()(const broker::RecoveredDequeue& rdeq) { @@ -584,13 +601,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } + typedef std::list<Queue::shared_ptr> QueueList; + + void copy(const QueueList& l, Array& a) { + for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) + a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + } + void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); + assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); proxy.txPublish(qarray, txPub.delivered); } @@ -600,17 +622,43 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + +void UpdateClient::updateTransactionState(broker::SemanticState& s) { ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); + if (dtx) { + updateBufferRef(dtx, false); // Current transaction. + } else if (tx) { proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); + tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } +} + +void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { + ClusterConnectionProxy proxy(session); + proxy.dtxStart( + dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); + TxOpUpdater updater(*this, session, expiry); + dtx->accept(updater); + proxy.dtxEnd(); } void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { @@ -663,5 +711,17 @@ void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, } } +void UpdateClient::updateDtxManager() { + broker::DtxManager& dtm = updaterBroker.getDtxManager(); + dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); +} + +void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { + QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); + for (size_t i = 0; i < r.size(); ++i) + updateDtxBuffer(r[i]); + ClusterConnectionProxy(session).dtxWorkRecord( + r.getXid(), r.isPrepared(), r.getTimeout()); +} }} // namespace qpid::cluster |