diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 52 |
1 files changed, 35 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 604df9dde6..ada26ab2fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxPublish.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -35,6 +36,14 @@ #include <boost/current_function.hpp> +// FIXME aconway 2008-11-03: +// +// Disproportionate amount of code here is dedicated to receiving a +// brain-dump when joining a cluster and building initial +// state. Should be separated out into its own classes. +// + + namespace qpid { namespace cluster { @@ -180,10 +189,16 @@ void Connection::deliverBuffer(Buffer& buf) { delivered(mcastDecoder.frame); } +broker::SessionState& Connection::sessionState() { + return *connection.getChannel(currentChannel).getSession(); +} + +broker::SemanticState& Connection::semanticState() { + return sessionState().getSemanticState(); +} + void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - broker::SemanticState::ConsumerImpl& c = s->getConsumer(name); + broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); } @@ -197,9 +212,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - s->setState( + sessionState().setState( replayStart, sendCommandPoint, sentIncomplete, @@ -207,7 +220,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << s->getId()); + QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -234,6 +247,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } +broker::QueuedMessage Connection::getDumpMessage() { + // Get a message from the DUMP queue. + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); + if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); + broker::QueuedMessage m = dumpQueue->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); + return m; +} + void Connection::deliveryRecord(const string& qname, const SequenceNumber& position, const string& tag, @@ -245,15 +267,14 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::QueuedMessage m; broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue")); - + broker::QueuedMessage m; if (!ended) { // Has a message - if (acquired) // Message at front of dump queue + if (acquired) { // Message at front of dump queue + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); m = dumpQueue->get(); + } else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -266,10 +287,7 @@ void Connection::deliveryRecord(const string& qname, if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - assert(s); - s->record(dr); + semanticState().record(dr); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -286,7 +304,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } - + }} // namespace qpid::cluster |