diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ClusterSafe.cpp | 11 |
8 files changed, 109 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 619f1a1bcb..bc755e3498 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -76,8 +76,14 @@ struct ConnectionTimeoutTask : public sys::TimerTask { } }; -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, - const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) : +Connection::Connection(ConnectionOutputHandler* out_, + Broker& broker_, const + std::string& mgmtId_, + const qpid::sys::SecuritySettings& external, + bool isLink_, + uint64_t objectId_, + bool shadow_, + bool delayManagement) : ConnectionState(out_, broker_), securitySettings(external), adapter(*this, isLink_, shadow_), @@ -89,26 +95,30 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std agent(0), timer(broker_.getTimer()), errorListener(0), + objectId(objectId_), shadow(shadow_) { - Manageable* parent = broker.GetVhostObject(); - if (isLink) links.notifyConnection(mgmtId, this); + // In a cluster, allow adding the management object to be delayed. + if (!delayManagement) addManagementObject(); + if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); +} - if (parent != 0) - { - agent = broker_.getManagementAgent(); - - // TODO set last bool true if system connection +void Connection::addManagementObject() { + assert(agent == 0); + assert(mgmtObject == 0); + Manageable* parent = broker.GetVhostObject(); + if (parent != 0) { + agent = broker.getManagementAgent(); if (agent != 0) { + // TODO set last bool true if system connection mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId); } ConnectionState::setUrl(mgmtId); } - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::requestIOProcessing(boost::function0<void> callback) diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index cf199fa831..8ad78f6652 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -79,9 +79,15 @@ class Connection : public sys::ConnectionInputHandler, virtual void connectionError(const std::string&) = 0; }; - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, + Connection(sys::ConnectionOutputHandler* out, + Broker& broker, + const std::string& mgmtId, const qpid::sys::SecuritySettings&, - bool isLink = false, uint64_t objectId = 0, bool shadow=false); + bool isLink = false, + uint64_t objectId = 0, + bool shadow=false, + bool delayManagement = false); + ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ @@ -139,6 +145,9 @@ class Connection : public sys::ConnectionInputHandler, // Used by cluster to update connection status sys::AggregateOutput& getOutputTasks() { return outputTasks; } + /** Cluster delays adding management object in the constructor then calls this. */ + void addManagementObject(); + const qpid::sys::SecuritySettings& getExternalSecuritySettings() const { return securitySettings; @@ -166,6 +175,7 @@ class Connection : public sys::ConnectionInputHandler, boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; + uint64_t objectId; bool shadow; public: diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5d13c1ad8f..7eb0798914 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 904565; +const uint32_t Cluster::CLUSTER_VERSION = 956001; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastAliveCount(0), lastBroker(false), updateRetracted(false), + updateClosed(false), error(*this) { // We give ownership of the timer to the broker and keep a plain pointer. @@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { connectionSettings(settings))); } +// Called in network thread +void Cluster::updateInClosed() { + Lock l(lock); + assert(!updateClosed); + updateClosed = true; + checkUpdateIn(l); +} + // Called in update thread. void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); @@ -879,6 +888,7 @@ void Cluster::updateInRetracted() { void Cluster::checkUpdateIn(Lock& l) { if (state != UPDATEE) return; // Wait till we reach the stall point. + if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; failoverExchange->setUrls(getUrls(l)); @@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) { } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; + updateClosed = false; state = JOINER; QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 0d8b55cf01..84dee27e94 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread + void updateInClosed(); void updateInDone(const ClusterMap&); void updateInRetracted(); @@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - bool updateRetracted; + bool updateRetracted, updateClosed; ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 22e1db2036..42f800bd18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -22,7 +22,6 @@ #include "UpdateClient.h" #include "Cluster.h" #include "UpdateReceiver.h" - #include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -43,7 +42,6 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" - #include <boost/current_function.hpp> @@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, { cluster.addLocalConnection(this); if (isLocalClient()) { - // Local clients are announced to the cluster - // and initialized when the announce is received. giveReadCredit(cluster.getSettings().readMax); // Flow control - init(); + // Delay adding the connection to the management map until announce() + connectionCtor.delayManagement = true; } else { // Catch-up shadow connections initialized using nextShadow id. @@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - init(); - } - QPID_LOG(info, "incoming connection " << *this); + } + init(); + QPID_LOG(debug, cluster << " local connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -152,8 +149,11 @@ void Connection::announce( QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized. - if (isShadow()) { + // Local connections are already initialized but with management delayed. + if (isLocalClient()) { + connection->addManagementObject(); + } + else if (isShadow()) { init(); // Play initial frames into the connection. Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); @@ -162,8 +162,9 @@ void Connection::announce( connection->received(frame); connection->setUserId(username); } - // Raise the connection management event now that the connection is replicated. + // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); + QPID_LOG(debug, cluster << " replicated connection " << *this); } Connection::~Connection() { @@ -249,6 +250,7 @@ void Connection::closed() { if (isUpdated()) { QPID_LOG(debug, cluster << " update connection closed " << *this); close(); + cluster.updateInClosed(); } else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); @@ -259,7 +261,8 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self); } } catch (const std::exception& e) { @@ -268,17 +271,21 @@ void Connection::closed() { } // Self-delivery of close message, close the connection. -void Connection::deliverClose () { - assert(!catchUp); - close(); +void Connection::deliverClose (bool aborted) { + QPID_LOG(debug, cluster << " replicated close of " << *this); + if (connection.get()) { + if (aborted) connection->abort(); + else connection->closed(); + connection.reset(); + } cluster.erase(self); } // Close the connection void Connection::close() { + QPID_LOG(debug, cluster << " local close of " << *this); if (connection.get()) { connection->closed(); - // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } } @@ -286,11 +293,9 @@ void Connection::close() { // The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { - connection->abort(); - // Ensure we delete the broker::Connection in the deliver thread. - connection.reset(); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self); } - cluster.erase(self); } // ConnectionCodec::decode receives read buffers from directly-connected clients. diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 45d832a5ff..72a98c12f1 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -170,7 +170,7 @@ class Connection : const std::string& initFrames); void close(); void abort(); - void deliverClose(); + void deliverClose(bool); OutputInterceptor& getOutput() { return output; } @@ -194,6 +194,7 @@ class Connection : bool isLink; uint64_t objectId; bool shadow; + bool delayManagement; ConnectionCtor( sys::ConnectionOutputHandler* out_, @@ -202,14 +203,19 @@ class Connection : const qpid::sys::SecuritySettings& external_, bool isLink_=false, uint64_t objectId_=0, - bool shadow_=false + bool shadow_=false, + bool delayManagement_=false ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), - isLink(isLink_), objectId(objectId_), shadow(shadow_) + isLink(isLink_), objectId(objectId_), shadow(shadow_), + delayManagement(delayManagement_) {} std::auto_ptr<broker::Connection> construct() { return std::auto_ptr<broker::Connection>( - new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow)); + new broker::Connection( + out, broker, mgmtId, external, isLink, objectId, + shadow, delayManagement) + ); } }; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index b1c27804db..8818a4c3ac 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -2321,6 +2321,23 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { } } +namespace { +bool isNotDeleted(const ManagementObjectMap::value_type& value) { + return !value.second->isDeleted(); +} + +size_t countNotDeleted(const ManagementObjectMap& map) { + return std::count_if(map.begin(), map.end(), isNotDeleted); +} + +void dumpMap(std::ostream& o, const ManagementObjectMap& map) { + for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) { + if (!i->second->isDeleted()) + o << endl << " " << i->second->getObjectId().getV2Key(); + } +} +} // namespace + string ManagementAgent::debugSnapshot() { ostringstream msg; msg << " management snapshot:"; @@ -2328,8 +2345,8 @@ string ManagementAgent::debugSnapshot() { i != remoteAgents.end(); ++i) msg << " " << i->second->routingKey; msg << " packages: " << packages.size(); - msg << " objects: " << managementObjects.size(); - msg << " new objects: " << newManagementObjects.size(); + msg << " objects: " << countNotDeleted(managementObjects); + msg << " new objects: " << countNotDeleted(newManagementObjects); return msg.str(); } diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp index e051591afd..6105fc96c7 100644 --- a/cpp/src/qpid/sys/ClusterSafe.cpp +++ b/cpp/src/qpid/sys/ClusterSafe.cpp @@ -43,8 +43,15 @@ void assertClusterSafe() { } } -ClusterSafeScope::ClusterSafeScope() { inContext = true; } -ClusterSafeScope::~ClusterSafeScope() { inContext = false; } +ClusterSafeScope::ClusterSafeScope() { + assert(!inContext); + inContext = true; +} + +ClusterSafeScope::~ClusterSafeScope() { + assert(inContext); + inContext = false; +} void enableClusterSafe() { inCluster = true; } |