diff options
author | Alan Conway <aconway@apache.org> | 2010-01-27 22:20:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-27 22:20:36 +0000 |
commit | d0df2e739d5fba4bfb9f549720518e55d6fa9c9c (patch) | |
tree | a58fe9884e980d00a5407d55024f978363ec3d26 /cpp/src/qpid/cluster/Connection.cpp | |
parent | d68f4fc71cd9b7db52779e0358b6830828834076 (diff) | |
download | qpid-python-d0df2e739d5fba4bfb9f549720518e55d6fa9c9c.tar.gz |
In clustered broker: move construction of broker::Connections to the cluster dispatch thread.
Constructing a connection can involve sending management information so needs to be
in the cluster dispatch context.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903864 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 98 |
1 files changed, 63 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 89700c2d52..1c6be4e862 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -37,6 +37,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -73,42 +74,63 @@ const std::string shadowPrefix("[shadow]"); // Shadow connection - Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, - const ConnectionId& id, unsigned int ssf) +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, + const ConnectionId& id, unsigned int ssf) : cluster(c), self(id), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false), + connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf), + expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), consumerNumbering(c.getUpdateReceiver().consumerNumbering) -{ init(); } +{} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId member, bool isCatchUp, bool isLink, unsigned int ssf ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), - isCatchUp ? shadowPrefix+logId : logId, - ssf, - isLink, - isCatchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), + connectionCtor(&output, cluster.getBroker(), + isCatchUp ? shadowPrefix+logId : logId, + ssf, + isLink, + isCatchUp ? ++catchUpId : 0), + expectProtocolHeader(isLink), + mcastFrameHandler(cluster.getMulticast(), self), consumerNumbering(c.getUpdateReceiver().consumerNumbering) -{ init(); } +{ + cluster.addLocalConnection(this); + if (isLocalClient()) { + // Local clients are announced to the cluster + // and initialized when the announce is received. + QPID_LOG(info, "new client connection " << *this); + giveReadCredit(cluster.getSettings().readMax); + cluster.getMulticast().mcastControl( + ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId()); + } + else { + // Catch-up connections initialized immediately. + assert(catchUp); + QPID_LOG(info, "new catch-up connection " << *this); + init(); + } +} void Connection::init() { - QPID_LOG(debug, cluster << " new connection: " << *this); + connection = connectionCtor.construct(); + QPID_LOG(debug, cluster << " initialized connection: " << *this + << " ssf=" << connection->getSSF()); if (isLocalClient()) { - connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node - cluster.addLocalConnection(this); - giveReadCredit(cluster.getSettings().readMax); + // Actively send cluster-order frames from local node + connection->setClusterOrderOutput(mcastFrameHandler); } - else { // Shadow or catch-up connection - connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames - connection.setClientThrottling(false); // Disable client throttling, done by active node. - connection.setShadow(); // Mark the broker connection as a shadow. + else { // Shadow or catch-up connection + // Passive, discard cluster-order frames + connection->setClusterOrderOutput(nullFrameHandler); + // Disable client throttling, done by active node. + connection->setClientThrottling(false); + connection->setShadow(); // Mark the connection as a shadow. } if (!isCatchUp()) - connection.setErrorListener(this); + connection->setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -116,8 +138,13 @@ void Connection::giveReadCredit(int credit) { output.giveReadCredit(credit); } +void Connection::announce(uint32_t ssf) { + assert(ssf == connectionCtor.ssf); + init(); +} + Connection::~Connection() { - connection.setErrorListener(0); + if (connection.get()) connection->setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -131,14 +158,15 @@ void Connection::received(framing::AMQFrame& f) { if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection.received(f); + + connection->received(f); } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); - connection.getOutput().send(ok); + connection->getOutput().send(ok); output.closeOutput(); catchUp = false; } @@ -155,7 +183,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { } } if (!message.empty()) - connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); + connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); } @@ -177,9 +205,9 @@ void Connection::deliveredFrame(const EventFrame& f) { && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { if (f.type == DATA) // incoming data frames to broker::Connection - connection.received(const_cast<AMQFrame&>(f.frame)); + connection->received(const_cast<AMQFrame&>(f.frame)); else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); + broker::SessionState* ss = connection->getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -194,7 +222,7 @@ void Connection::closed() { } else if (isUpdated()) { QPID_LOG(debug, cluster << " closed update connection " << *this); - connection.closed(); + connection->closed(); } else if (isLocal()) { QPID_LOG(debug, cluster << " local close of replicated connection " << *this); @@ -213,13 +241,13 @@ void Connection::closed() { // Self-delivery of close message, close the connection. void Connection::deliverClose () { assert(!catchUp); - connection.closed(); + connection->closed(); cluster.erase(self); } // The connection has been killed for misbehaving void Connection::abort() { - connection.abort(); + if (connection.get()) connection->abort(); cluster.erase(self); } @@ -257,7 +285,7 @@ size_t Connection::decode(const char* buffer, size_t size) { } broker::SessionState& Connection::sessionState() { - return *connection.getChannel(currentChannel).getSession(); + return *connection->getChannel(currentChannel).getSession(); } broker::SemanticState& Connection::semanticState() { @@ -294,26 +322,26 @@ void Connection::sessionState( receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); // The output tasks will be added later in the update process. - connection.getOutputTasks().removeAll(); + connection->getOutputTasks().removeAll(); } void Connection::outputTask(uint16_t channel, const std::string& name) { - broker::SessionState* session = connection.getChannel(channel).getSession(); + broker::SessionState* session = connection->getChannel(channel).getSession(); if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); OutputTask* task = &session->getSemanticState().find(name); - connection.getOutputTasks().addOutputTask(task); + connection->getOutputTasks().addOutputTask(task); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; - connection.setUserId(username); + connection->setUserId(username); // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); - connection.setErrorListener(this); + connection->setErrorListener(this); output.setSendMax(sendMax); } |