diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 98 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 38 |
3 files changed, 101 insertions, 57 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d398f30a86..779c162f9a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -294,14 +294,8 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(info, *this << " new local connection " << c->getId()); - localConnections.insert(c); assert(c->getId().getMember() == self); - // Announce the connection to the cluster. - if (c->isLocalClient()) - mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(), - c->getBrokerConnection().getSSF() ), - c->getId()); + localConnections.insert(c); } // Called in connection thread to insert an updated shadow connection. @@ -497,22 +491,18 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { if (i != connections.end()) return i->second; ConnectionPtr cp; // If the frame is an announcement for a new connection, add it. - if (e.frame.getBody() && e.frame.getMethod() && - e.frame.getMethod()->isA<ClusterConnectionAnnounceBody>()) + const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); + if (e.frame.getBody() && e.frame.getMethod() && announce) { if (id.getMember() == self) { // Announces one of my own cp = localConnections.getErase(id); - assert(cp); + assert(cp); } else { // New remote connection, create a shadow. std::ostringstream mgmtId; - unsigned int ssf; - const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); - + unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; mgmtId << id; - ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; - QPID_LOG(debug, *this << "new connection's ssf =" << ssf ); - cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf ); + cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf); } connections.insert(ConnectionMap::value_type(id, cp)); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 89700c2d52..1c6be4e862 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -37,6 +37,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -73,42 +74,63 @@ const std::string shadowPrefix("[shadow]"); // Shadow connection - Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, - const ConnectionId& id, unsigned int ssf) +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, + const ConnectionId& id, unsigned int ssf) : cluster(c), self(id), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false), + connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf), + expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), consumerNumbering(c.getUpdateReceiver().consumerNumbering) -{ init(); } +{} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId member, bool isCatchUp, bool isLink, unsigned int ssf ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), - isCatchUp ? shadowPrefix+logId : logId, - ssf, - isLink, - isCatchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), + connectionCtor(&output, cluster.getBroker(), + isCatchUp ? shadowPrefix+logId : logId, + ssf, + isLink, + isCatchUp ? ++catchUpId : 0), + expectProtocolHeader(isLink), + mcastFrameHandler(cluster.getMulticast(), self), consumerNumbering(c.getUpdateReceiver().consumerNumbering) -{ init(); } +{ + cluster.addLocalConnection(this); + if (isLocalClient()) { + // Local clients are announced to the cluster + // and initialized when the announce is received. + QPID_LOG(info, "new client connection " << *this); + giveReadCredit(cluster.getSettings().readMax); + cluster.getMulticast().mcastControl( + ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId()); + } + else { + // Catch-up connections initialized immediately. + assert(catchUp); + QPID_LOG(info, "new catch-up connection " << *this); + init(); + } +} void Connection::init() { - QPID_LOG(debug, cluster << " new connection: " << *this); + connection = connectionCtor.construct(); + QPID_LOG(debug, cluster << " initialized connection: " << *this + << " ssf=" << connection->getSSF()); if (isLocalClient()) { - connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node - cluster.addLocalConnection(this); - giveReadCredit(cluster.getSettings().readMax); + // Actively send cluster-order frames from local node + connection->setClusterOrderOutput(mcastFrameHandler); } - else { // Shadow or catch-up connection - connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames - connection.setClientThrottling(false); // Disable client throttling, done by active node. - connection.setShadow(); // Mark the broker connection as a shadow. + else { // Shadow or catch-up connection + // Passive, discard cluster-order frames + connection->setClusterOrderOutput(nullFrameHandler); + // Disable client throttling, done by active node. + connection->setClientThrottling(false); + connection->setShadow(); // Mark the connection as a shadow. } if (!isCatchUp()) - connection.setErrorListener(this); + connection->setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -116,8 +138,13 @@ void Connection::giveReadCredit(int credit) { output.giveReadCredit(credit); } +void Connection::announce(uint32_t ssf) { + assert(ssf == connectionCtor.ssf); + init(); +} + Connection::~Connection() { - connection.setErrorListener(0); + if (connection.get()) connection->setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -131,14 +158,15 @@ void Connection::received(framing::AMQFrame& f) { if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection.received(f); + + connection->received(f); } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); - connection.getOutput().send(ok); + connection->getOutput().send(ok); output.closeOutput(); catchUp = false; } @@ -155,7 +183,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { } } if (!message.empty()) - connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); + connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); } @@ -177,9 +205,9 @@ void Connection::deliveredFrame(const EventFrame& f) { && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { if (f.type == DATA) // incoming data frames to broker::Connection - connection.received(const_cast<AMQFrame&>(f.frame)); + connection->received(const_cast<AMQFrame&>(f.frame)); else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); + broker::SessionState* ss = connection->getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -194,7 +222,7 @@ void Connection::closed() { } else if (isUpdated()) { QPID_LOG(debug, cluster << " closed update connection " << *this); - connection.closed(); + connection->closed(); } else if (isLocal()) { QPID_LOG(debug, cluster << " local close of replicated connection " << *this); @@ -213,13 +241,13 @@ void Connection::closed() { // Self-delivery of close message, close the connection. void Connection::deliverClose () { assert(!catchUp); - connection.closed(); + connection->closed(); cluster.erase(self); } // The connection has been killed for misbehaving void Connection::abort() { - connection.abort(); + if (connection.get()) connection->abort(); cluster.erase(self); } @@ -257,7 +285,7 @@ size_t Connection::decode(const char* buffer, size_t size) { } broker::SessionState& Connection::sessionState() { - return *connection.getChannel(currentChannel).getSession(); + return *connection->getChannel(currentChannel).getSession(); } broker::SemanticState& Connection::semanticState() { @@ -294,26 +322,26 @@ void Connection::sessionState( receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); // The output tasks will be added later in the update process. - connection.getOutputTasks().removeAll(); + connection->getOutputTasks().removeAll(); } void Connection::outputTask(uint16_t channel, const std::string& name) { - broker::SessionState* session = connection.getChannel(channel).getSession(); + broker::SessionState* session = connection->getChannel(channel).getSession(); if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); OutputTask* task = &session->getSemanticState().find(name); - connection.getOutputTasks().addOutputTask(task); + connection->getOutputTasks().addOutputTask(task); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; - connection.setUserId(username); + connection->setUserId(username); // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); - connection.setErrorListener(this); + connection->setErrorListener(this); output.setSendMax(sendMax); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index f593aeb652..ca9b27ef3f 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -73,7 +73,7 @@ class Connection : ~Connection(); ConnectionId getId() const { return self; } - broker::Connection& getBrokerConnection() { return connection; } + broker::Connection& getBrokerConnection() { return *connection; } /** Local connections may be clients or catch-up connections */ bool isLocal() const; @@ -95,8 +95,8 @@ class Connection : void received(framing::AMQFrame&); void closed(); bool doOutput(); - void idleOut() { connection.idleOut(); } - void idleIn() { connection.idleIn(); } + void idleOut() { if (connection.get()) connection->idleOut(); } + void idleIn() { if (connection.get()) connection->idleIn(); } // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); @@ -156,7 +156,7 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - void announce(uint32_t) {} // handled by Cluster. + void announce(uint32_t ssf); void abort(); void deliverClose(); @@ -165,11 +165,36 @@ class Connection : void addQueueListener(const std::string& queue, uint32_t listener); void managementSchema(const std::string& data); + uint32_t getSsf() const { return connectionCtor.ssf; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} }; - + + // Arguments to construct a broker::Connection + struct ConnectionCtor { + sys::ConnectionOutputHandler* out; + broker::Broker& broker; + std::string mgmtId; + unsigned int ssf; + bool isLink; + uint64_t objectId; + + ConnectionCtor( + sys::ConnectionOutputHandler* out_, + broker::Broker& broker_, + const std::string& mgmtId_, + unsigned int ssf_, + bool isLink_=false, + uint64_t objectId_=0 + ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {} + + std::auto_ptr<broker::Connection> construct() { + return std::auto_ptr<broker::Connection>( + new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId)); + } + }; static NullFrameHandler nullFrameHandler; @@ -191,7 +216,8 @@ class Connection : bool catchUp; OutputInterceptor output; framing::FrameDecoder localDecoder; - broker::Connection connection; + ConnectionCtor connectionCtor; + std::auto_ptr<broker::Connection> connection; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; |