summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-27 22:20:36 +0000
committerAlan Conway <aconway@apache.org>2010-01-27 22:20:36 +0000
commitd0df2e739d5fba4bfb9f549720518e55d6fa9c9c (patch)
treea58fe9884e980d00a5407d55024f978363ec3d26 /cpp/src/qpid/cluster/Connection.cpp
parentd68f4fc71cd9b7db52779e0358b6830828834076 (diff)
downloadqpid-python-d0df2e739d5fba4bfb9f549720518e55d6fa9c9c.tar.gz
In clustered broker: move construction of broker::Connections to the cluster dispatch thread.
Constructing a connection can involve sending management information so needs to be in the cluster dispatch context. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp98
1 files changed, 63 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 89700c2d52..1c6be4e862 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -37,6 +37,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -73,42 +74,63 @@ const std::string shadowPrefix("[shadow]");
// Shadow connection
- Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
- const ConnectionId& id, unsigned int ssf)
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+ const ConnectionId& id, unsigned int ssf)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false),
+ connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+ expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId, MemberId member,
bool isCatchUp, bool isLink, unsigned int ssf
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(),
- isCatchUp ? shadowPrefix+logId : logId,
- ssf,
- isLink,
- isCatchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
+ connectionCtor(&output, cluster.getBroker(),
+ isCatchUp ? shadowPrefix+logId : logId,
+ ssf,
+ isLink,
+ isCatchUp ? ++catchUpId : 0),
+ expectProtocolHeader(isLink),
+ mcastFrameHandler(cluster.getMulticast(), self),
consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{
+ cluster.addLocalConnection(this);
+ if (isLocalClient()) {
+ // Local clients are announced to the cluster
+ // and initialized when the announce is received.
+ QPID_LOG(info, "new client connection " << *this);
+ giveReadCredit(cluster.getSettings().readMax);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+ }
+ else {
+ // Catch-up connections initialized immediately.
+ assert(catchUp);
+ QPID_LOG(info, "new catch-up connection " << *this);
+ init();
+ }
+}
void Connection::init() {
- QPID_LOG(debug, cluster << " new connection: " << *this);
+ connection = connectionCtor.construct();
+ QPID_LOG(debug, cluster << " initialized connection: " << *this
+ << " ssf=" << connection->getSSF());
if (isLocalClient()) {
- connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
- cluster.addLocalConnection(this);
- giveReadCredit(cluster.getSettings().readMax);
+ // Actively send cluster-order frames from local node
+ connection->setClusterOrderOutput(mcastFrameHandler);
}
- else { // Shadow or catch-up connection
- connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
- connection.setClientThrottling(false); // Disable client throttling, done by active node.
- connection.setShadow(); // Mark the broker connection as a shadow.
+ else { // Shadow or catch-up connection
+ // Passive, discard cluster-order frames
+ connection->setClusterOrderOutput(nullFrameHandler);
+ // Disable client throttling, done by active node.
+ connection->setClientThrottling(false);
+ connection->setShadow(); // Mark the connection as a shadow.
}
if (!isCatchUp())
- connection.setErrorListener(this);
+ connection->setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -116,8 +138,13 @@ void Connection::giveReadCredit(int credit) {
output.giveReadCredit(credit);
}
+void Connection::announce(uint32_t ssf) {
+ assert(ssf == connectionCtor.ssf);
+ init();
+}
+
Connection::~Connection() {
- connection.setErrorListener(0);
+ if (connection.get()) connection->setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -131,14 +158,15 @@ void Connection::received(framing::AMQFrame& f) {
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+
+ connection->received(f);
}
else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
if (isShadow())
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
- connection.getOutput().send(ok);
+ connection->getOutput().send(ok);
output.closeOutput();
catchUp = false;
}
@@ -155,7 +183,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
}
}
if (!message.empty())
- connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+ connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
}
@@ -177,9 +205,9 @@ void Connection::deliveredFrame(const EventFrame& f) {
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
if (f.type == DATA) // incoming data frames to broker::Connection
- connection.received(const_cast<AMQFrame&>(f.frame));
+ connection->received(const_cast<AMQFrame&>(f.frame));
else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
+ broker::SessionState* ss = connection->getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -194,7 +222,7 @@ void Connection::closed() {
}
else if (isUpdated()) {
QPID_LOG(debug, cluster << " closed update connection " << *this);
- connection.closed();
+ connection->closed();
}
else if (isLocal()) {
QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
@@ -213,13 +241,13 @@ void Connection::closed() {
// Self-delivery of close message, close the connection.
void Connection::deliverClose () {
assert(!catchUp);
- connection.closed();
+ connection->closed();
cluster.erase(self);
}
// The connection has been killed for misbehaving
void Connection::abort() {
- connection.abort();
+ if (connection.get()) connection->abort();
cluster.erase(self);
}
@@ -257,7 +285,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
broker::SessionState& Connection::sessionState() {
- return *connection.getChannel(currentChannel).getSession();
+ return *connection->getChannel(currentChannel).getSession();
}
broker::SemanticState& Connection::semanticState() {
@@ -294,26 +322,26 @@ void Connection::sessionState(
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
// The output tasks will be added later in the update process.
- connection.getOutputTasks().removeAll();
+ connection->getOutputTasks().removeAll();
}
void Connection::outputTask(uint16_t channel, const std::string& name) {
- broker::SessionState* session = connection.getChannel(channel).getSession();
+ broker::SessionState* session = connection->getChannel(channel).getSession();
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
OutputTask* task = &session->getSemanticState().find(name);
- connection.getOutputTasks().addOutputTask(task);
+ connection->getOutputTasks().addOutputTask(task);
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
- connection.setUserId(username);
+ connection->setUserId(username);
// OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
- connection.setErrorListener(this);
+ connection->setErrorListener(this);
output.setSendMax(sendMax);
}