summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp52
1 files changed, 33 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3ce2b3f376..a375a65851 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -23,6 +23,7 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
+#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -74,28 +75,30 @@ const std::string shadowPrefix("[shadow]");
// Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& mgmtId,
const ConnectionId& id, unsigned int ssf)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+ connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
- consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+ updateIn(c.getUpdateReceiver())
{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& logId, MemberId member,
+ const std::string& mgmtId, MemberId member,
bool isCatchUp, bool isLink, unsigned int ssf
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connectionCtor(&output, cluster.getBroker(),
- isCatchUp ? shadowPrefix+logId : logId,
+ mgmtId,
ssf,
isLink,
- isCatchUp ? ++catchUpId : 0),
+ isCatchUp ? ++catchUpId : 0,
+ isCatchUp), // isCatchUp => shadow
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
- consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+ updateIn(c.getUpdateReceiver())
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -104,12 +107,14 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax);
cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+ ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId());
}
else {
- // Catch-up connections initialized immediately.
+ // Catch-up shadow connections initialized using nextShadow id.
assert(catchUp);
QPID_LOG(info, "new catch-up connection " << *this);
+ connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+ updateIn.nextShadowMgmtId.clear();
init();
}
}
@@ -127,7 +132,6 @@ void Connection::init() {
connection->setClusterOrderOutput(nullFrameHandler);
// Disable client throttling, done by active node.
connection->setClientThrottling(false);
- connection->setShadow(); // Mark the connection as a shadow.
}
if (!isCatchUp())
connection->setErrorListener(this);
@@ -138,8 +142,9 @@ void Connection::giveReadCredit(int credit) {
output.giveReadCredit(credit);
}
-void Connection::announce(uint32_t ssf) {
- assert(ssf == connectionCtor.ssf);
+void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+ QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
+ QPID_ASSERT(ssf == connectionCtor.ssf);
init();
}
@@ -296,13 +301,17 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
+void Connection::shadowPrepare(const std::string& mgmtId) {
+ updateIn.nextShadowMgmtId = mgmtId;
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.position = position;
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- consumerNumbering.add(c.shared_from_this());
+ updateIn.consumerNumbering.add(c.shared_from_this());
}
@@ -337,10 +346,15 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
OutputTask* task = &session->getSemanticState().find(name);
connection->getOutputTasks().addOutputTask(task);
}
-
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
+
+void Connection::shadowReady(
+ uint64_t memberId, uint64_t connectionId, const string& mgmtId,
+ const string& username, const string& fragment, uint32_t sendMax)
+{
+ QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
ConnectionId shadowId = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this
+ << " becomes shadow " << shadowId);
self = shadowId;
connection->setUserId(username);
// OK to use decoder here because cluster is stalled for update.
@@ -355,7 +369,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
- consumerNumbering.clear();
+ updateIn.consumerNumbering.clear();
self.second = 0; // Mark this as completed update connection.
}
@@ -503,9 +517,9 @@ void Connection::connectionError(const std::string& msg) {
}
void Connection::addQueueListener(const std::string& q, uint32_t listener) {
- if (listener >= consumerNumbering.size())
+ if (listener >= updateIn.consumerNumbering.size())
throw Exception(QPID_MSG("Invalid listener ID: " << listener));
- findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+ findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
}
void Connection::managementSchema(const std::string& data) {