diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 65 |
1 files changed, 49 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ada26ab2fb..513816735d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,7 +24,11 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/RecoveredDequeue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -36,7 +40,7 @@ #include <boost/current_function.hpp> -// FIXME aconway 2008-11-03: +// TODO aconway 2008-11-03: // // Disproportionate amount of code here is dedicated to receiving a // brain-dump when joining a cluster and building initial @@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { std::string message; if (body.getMethod()) { switch (body.getMethod()->amqpClassId()) { - case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break; case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } @@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) { if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; } if (!message.empty()) - connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0); + connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0); return !message.empty(); } // Delivered from cluster. void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f); + QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f); assert(!catchUp); currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol. @@ -247,11 +250,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } + +shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { + shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); + if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); + return queue; +} + broker::QueuedMessage Connection::getDumpMessage() { - // Get a message from the DUMP queue. - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); - broker::QueuedMessage m = dumpQueue->get(); + broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); return m; } @@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); - if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); broker::QueuedMessage m; + broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) { // Message at front of dump queue - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - m = dumpQueue->get(); - } + if (acquired) // Message is on the dump queue + m = getDumpMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - - semanticState().record(dr); + semanticState().record(dr); // Part of the session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } +void Connection::txStart() { + txBuffer = make_shared_ptr(new broker::TxBuffer()); +} +void Connection::txAccept(const framing::SequenceSet& acked) { + txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked()))); +} + +void Connection::txDequeue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txEnqueue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txPublish(const framing::Array& queues, bool delivered) { + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload)); + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) + txPub->deliverTo(findQueue((*i)->get<std::string>())); + txPub->delivered = delivered; + txBuffer->enlist(txPub); +} + +void Connection::txEnd() { + semanticState().setTxBuffer(txBuffer); +} + +void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { + semanticState().setAccumulatedAck(s); +} }} // namespace qpid::cluster |