summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp46
1 files changed, 23 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 839a0e67b9..4b3e6da3fb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -19,7 +19,7 @@
*
*/
#include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
@@ -45,8 +45,8 @@
// TODO aconway 2008-11-03:
//
-// Disproportionate amount of code here is dedicated to receiving a
-// brain-dump when joining a cluster and building initial
+// Disproportionate amount of code here is dedicated to receiving an
+// update when joining a cluster and building initial
// state. Should be separated out into its own classes.
//
@@ -104,7 +104,7 @@ void Connection::received(framing::AMQFrame& f) {
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
- else { // Shadow or dumped ex catch-up connection.
+ else { // Shadow or updated ex catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
if (isShadow()) {
QPID_LOG(debug, cluster << " inserting connection " << *this);
@@ -155,7 +155,7 @@ void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
// Delivered from cluster.
void Connection::deliveredFrame(const EventFrame& f) {
- QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
+ QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame);
assert(!catchUp);
currentChannel = f.frame.getChannel();
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
@@ -174,8 +174,8 @@ void Connection::closed() {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
cluster.leave();
}
- else if (isDumped()) {
- QPID_LOG(debug, cluster << " closed dump connection " << *this);
+ else if (isUpdated()) {
+ QPID_LOG(debug, cluster << " closed update connection " << *this);
connection.closed();
}
else if (isLocal()) {
@@ -268,7 +268,7 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
+ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -277,10 +277,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::membership(const FieldTable& newbies, const FieldTable& members) {
- QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this);
- cluster.dumpInDone(ClusterMap(newbies, members));
- self.second = 0; // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+ QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
+ cluster.updateInDone(ClusterMap(joiners, members));
+ self.second = 0; // Mark this as completed update connection.
}
bool Connection::isLocal() const {
@@ -291,7 +291,7 @@ bool Connection::isShadow() const {
return self.first != cluster.getId();
}
-bool Connection::isDumped() const {
+bool Connection::isUpdated() const {
return self.first == cluster.getId() && self.second == 0;
}
@@ -302,9 +302,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
return queue;
}
-broker::QueuedMessage Connection::getDumpMessage() {
- broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
- if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+broker::QueuedMessage Connection::getUpdateMessage() {
+ broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -323,12 +323,12 @@ void Connection::deliveryRecord(const string& qname,
broker::QueuedMessage m;
broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) // Message is on the dump queue
- m = getDumpMessage();
+ if (acquired) // Message is on the update queue
+ m = getUpdateMessage();
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no dump message"));
+ throw Exception(QPID_MSG("deliveryRecord no update message"));
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -349,7 +349,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
else if (c.isShadow()) type = "shadow";
- else if (c.isDumped()) type = "dumped";
+ else if (c.isUpdated()) type = "updated";
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
@@ -361,15 +361,15 @@ void Connection::txAccept(const framing::SequenceSet& acked) {
}
void Connection::txDequeue(const std::string& queue) {
- txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload)));
}
void Connection::txEnqueue(const std::string& queue) {
- txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;