diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 839a0e67b9..4b3e6da3fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,7 +19,7 @@ * */ #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "Cluster.h" #include "qpid/broker/SessionState.h" @@ -45,8 +45,8 @@ // TODO aconway 2008-11-03: // -// Disproportionate amount of code here is dedicated to receiving a -// brain-dump when joining a cluster and building initial +// Disproportionate amount of code here is dedicated to receiving an +// update when joining a cluster and building initial // state. Should be separated out into its own classes. // @@ -104,7 +104,7 @@ void Connection::received(framing::AMQFrame& f) { if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or dumped ex catch-up connection. + else { // Shadow or updated ex catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { if (isShadow()) { QPID_LOG(debug, cluster << " inserting connection " << *this); @@ -155,7 +155,7 @@ void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { // Delivered from cluster. void Connection::deliveredFrame(const EventFrame& f) { - QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame); + QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame); assert(!catchUp); currentChannel = f.frame.getChannel(); if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. @@ -174,8 +174,8 @@ void Connection::closed() { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isDumped()) { - QPID_LOG(debug, cluster << " closed dump connection " << *this); + else if (isUpdated()) { + QPID_LOG(debug, cluster << " closed update connection " << *this); connection.closed(); } else if (isLocal()) { @@ -268,7 +268,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); + QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -277,10 +277,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& newbies, const FieldTable& members) { - QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this); - cluster.dumpInDone(ClusterMap(newbies, members)); - self.second = 0; // Mark this as completed dump connection. +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { + QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); + cluster.updateInDone(ClusterMap(joiners, members)); + self.second = 0; // Mark this as completed update connection. } bool Connection::isLocal() const { @@ -291,7 +291,7 @@ bool Connection::isShadow() const { return self.first != cluster.getId(); } -bool Connection::isDumped() const { +bool Connection::isUpdated() const { return self.first == cluster.getId() && self.second == 0; } @@ -302,9 +302,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { return queue; } -broker::QueuedMessage Connection::getDumpMessage() { - broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); - if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); +broker::QueuedMessage Connection::getUpdateMessage() { + broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -323,12 +323,12 @@ void Connection::deliveryRecord(const string& qname, broker::QueuedMessage m; broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) // Message is on the dump queue - m = getDumpMessage(); + if (acquired) // Message is on the update queue + m = getUpdateMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no dump message")); + throw Exception(QPID_MSG("deliveryRecord no update message")); } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -349,7 +349,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; else if (c.isShadow()) type = "shadow"; - else if (c.isDumped()) type = "dumped"; + else if (c.isUpdated()) type = "updated"; return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } @@ -361,15 +361,15 @@ void Connection::txAccept(const framing::SequenceSet& acked) { } void Connection::txDequeue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txEnqueue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload)); + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; |