diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 7 |
4 files changed, 78 insertions, 31 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 604df9dde6..ada26ab2fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxPublish.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -35,6 +36,14 @@ #include <boost/current_function.hpp> +// FIXME aconway 2008-11-03: +// +// Disproportionate amount of code here is dedicated to receiving a +// brain-dump when joining a cluster and building initial +// state. Should be separated out into its own classes. +// + + namespace qpid { namespace cluster { @@ -180,10 +189,16 @@ void Connection::deliverBuffer(Buffer& buf) { delivered(mcastDecoder.frame); } +broker::SessionState& Connection::sessionState() { + return *connection.getChannel(currentChannel).getSession(); +} + +broker::SemanticState& Connection::semanticState() { + return sessionState().getSemanticState(); +} + void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - broker::SemanticState::ConsumerImpl& c = s->getConsumer(name); + broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); } @@ -197,9 +212,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - s->setState( + sessionState().setState( replayStart, sendCommandPoint, sentIncomplete, @@ -207,7 +220,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << s->getId()); + QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -234,6 +247,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } +broker::QueuedMessage Connection::getDumpMessage() { + // Get a message from the DUMP queue. + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); + if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); + broker::QueuedMessage m = dumpQueue->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); + return m; +} + void Connection::deliveryRecord(const string& qname, const SequenceNumber& position, const string& tag, @@ -245,15 +267,14 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::QueuedMessage m; broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue")); - + broker::QueuedMessage m; if (!ended) { // Has a message - if (acquired) // Message at front of dump queue + if (acquired) { // Message at front of dump queue + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); m = dumpQueue->get(); + } else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -266,10 +287,7 @@ void Connection::deliveryRecord(const string& qname, if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - assert(s); - s->record(dr); + semanticState().record(dr); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -286,7 +304,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } - + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 9f75d3dae3..331ac33ab0 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,6 +40,13 @@ namespace qpid { namespace framing { class AMQFrame; } +namespace broker { +class SemanticState; +class QueuedMessage; +class TxBuffer; +class TxAccept; +} + namespace cluster { class Cluster; @@ -117,15 +124,17 @@ class Connection : bool windowing); void queuePosition(const std::string&, const framing::SequenceNumber&); - - private: - bool catcUp; + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); + broker::SessionState& sessionState(); + broker::SemanticState& semanticState(); + broker::QueuedMessage getDumpMessage(); + static NoOpConnectionOutputHandler discardHandler; Cluster& cluster; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 40852a0411..a2860f6f32 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -39,10 +39,12 @@ #include "qpid/framing/ClusterConnectionConsumerStateBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> + namespace qpid { namespace cluster { @@ -103,7 +105,7 @@ void DumpClient::dump() { // Dump exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); -// Dump queue is used to transfer acquired messages that are no longer on their original queue. + // Dump queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); session.sync(); session.close(); @@ -154,7 +156,7 @@ class MessageDumper { session.exchangeUnbind(queue, DumpClient::DUMP); } - void dump(const broker::QueuedMessage& message) { + void dumpQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); haveLastPos = true; @@ -165,6 +167,10 @@ class MessageDumper { framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames()); } + + void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) { + dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); + } }; @@ -178,7 +184,7 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { arg::autoDelete=q->isAutoDelete(), arg::arguments=q->getSettings()); MessageDumper dumper(q->getName(), session); - q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1)); + q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); } @@ -217,11 +223,14 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. - ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + QPID_LOG(debug, dumperId << " dumping consumers."); + ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); + + QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); + ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); - // Adjust for message in progress, will be sent after state update. SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; @@ -274,14 +283,22 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { } void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - assert(dr.isEnded() || dr.getMessage().payload); + dumpDeliveryRecordMessage(dr); + dumpDeliveryRecord(dr); +} - if (!dr.isEnded() && dr.isAcquired()) { +void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) { + // Dump the message associated with a dr if need be. + if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // dumpees queue, put it on the dump queue for dumpee to pick up. // - MessageDumper(DUMP, shadowSession).dump(dr.getMessage()); + MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); } +} + +void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { + // Assumes the associated message has already been dumped (if needed) ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index bb349a39ee..716e7dcc3a 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -44,6 +44,8 @@ class QueueBinding; class QueuedMessage; class SessionHandler; class DeliveryRecord; +class SessionState; +class SemanticState; } // namespace broker @@ -79,8 +81,9 @@ class DumpClient : public sys::Runnable { void dumpSession(broker::SessionHandler& s); void dumpConsumer(const broker::SemanticState::ConsumerImpl*); void dumpUnacked(const broker::DeliveryRecord&); - - private: + void dumpDeliveryRecord(const broker::DeliveryRecord&); + void dumpDeliveryRecordMessage(const broker::DeliveryRecord&); + MemberId dumperId; MemberId dumpeeId; Url dumpeeUrl; |