summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp30
-rw-r--r--cpp/src/qpid/broker/Connection.h14
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp13
-rw-r--r--cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp45
-rw-r--r--cpp/src/qpid/cluster/Connection.h14
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp21
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.cpp11
8 files changed, 109 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 619f1a1bcb..bc755e3498 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -76,8 +76,14 @@ struct ConnectionTimeoutTask : public sys::TimerTask {
}
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_,
- const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
+Connection::Connection(ConnectionOutputHandler* out_,
+ Broker& broker_, const
+ std::string& mgmtId_,
+ const qpid::sys::SecuritySettings& external,
+ bool isLink_,
+ uint64_t objectId_,
+ bool shadow_,
+ bool delayManagement) :
ConnectionState(out_, broker_),
securitySettings(external),
adapter(*this, isLink_, shadow_),
@@ -89,26 +95,30 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
agent(0),
timer(broker_.getTimer()),
errorListener(0),
+ objectId(objectId_),
shadow(shadow_)
{
- Manageable* parent = broker.GetVhostObject();
-
if (isLink)
links.notifyConnection(mgmtId, this);
+ // In a cluster, allow adding the management object to be delayed.
+ if (!delayManagement) addManagementObject();
+ if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
+}
- if (parent != 0)
- {
- agent = broker_.getManagementAgent();
-
- // TODO set last bool true if system connection
+void Connection::addManagementObject() {
+ assert(agent == 0);
+ assert(mgmtObject == 0);
+ Manageable* parent = broker.GetVhostObject();
+ if (parent != 0) {
+ agent = broker.getManagementAgent();
if (agent != 0) {
+ // TODO set last bool true if system connection
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
}
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::requestIOProcessing(boost::function0<void> callback)
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index cf199fa831..8ad78f6652 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -79,9 +79,15 @@ class Connection : public sys::ConnectionInputHandler,
virtual void connectionError(const std::string&) = 0;
};
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId,
+ Connection(sys::ConnectionOutputHandler* out,
+ Broker& broker,
+ const std::string& mgmtId,
const qpid::sys::SecuritySettings&,
- bool isLink = false, uint64_t objectId = 0, bool shadow=false);
+ bool isLink = false,
+ uint64_t objectId = 0,
+ bool shadow=false,
+ bool delayManagement = false);
+
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
@@ -139,6 +145,9 @@ class Connection : public sys::ConnectionInputHandler,
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
+ /** Cluster delays adding management object in the constructor then calls this. */
+ void addManagementObject();
+
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
{
return securitySettings;
@@ -166,6 +175,7 @@ class Connection : public sys::ConnectionInputHandler,
boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
+ uint64_t objectId;
bool shadow;
public:
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5d13c1ad8f..7eb0798914 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 904565;
+const uint32_t Cluster::CLUSTER_VERSION = 956001;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
+ updateClosed(false),
error(*this)
{
// We give ownership of the timer to the broker and keep a plain pointer.
@@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
connectionSettings(settings)));
}
+// Called in network thread
+void Cluster::updateInClosed() {
+ Lock l(lock);
+ assert(!updateClosed);
+ updateClosed = true;
+ checkUpdateIn(l);
+}
+
// Called in update thread.
void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
@@ -879,6 +888,7 @@ void Cluster::updateInRetracted() {
void Cluster::checkUpdateIn(Lock& l) {
if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
failoverExchange->setUrls(getUrls(l));
@@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) {
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
+ updateClosed = false;
state = JOINER;
QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 0d8b55cf01..84dee27e94 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
+ void updateInClosed();
void updateInDone(const ClusterMap&);
void updateInRetracted();
@@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
- bool updateRetracted;
+ bool updateRetracted, updateClosed;
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 22e1db2036..42f800bd18 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -22,7 +22,6 @@
#include "UpdateClient.h"
#include "Cluster.h"
#include "UpdateReceiver.h"
-
#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -43,7 +42,6 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-
#include <boost/current_function.hpp>
@@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
- // Local clients are announced to the cluster
- // and initialized when the announce is received.
giveReadCredit(cluster.getSettings().readMax); // Flow control
- init();
+ // Delay adding the connection to the management map until announce()
+ connectionCtor.delayManagement = true;
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- init();
- }
- QPID_LOG(info, "incoming connection " << *this);
+ }
+ init();
+ QPID_LOG(debug, cluster << " local connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -152,8 +149,11 @@ void Connection::announce(
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- // Local connections are already initialized.
- if (isShadow()) {
+ // Local connections are already initialized but with management delayed.
+ if (isLocalClient()) {
+ connection->addManagementObject();
+ }
+ else if (isShadow()) {
init();
// Play initial frames into the connection.
Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
@@ -162,8 +162,9 @@ void Connection::announce(
connection->received(frame);
connection->setUserId(username);
}
- // Raise the connection management event now that the connection is replicated.
+ // Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
+ QPID_LOG(debug, cluster << " replicated connection " << *this);
}
Connection::~Connection() {
@@ -249,6 +250,7 @@ void Connection::closed() {
if (isUpdated()) {
QPID_LOG(debug, cluster << " update connection closed " << *this);
close();
+ cluster.updateInClosed();
}
else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
@@ -259,7 +261,8 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self);
}
}
catch (const std::exception& e) {
@@ -268,17 +271,21 @@ void Connection::closed() {
}
// Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
- assert(!catchUp);
- close();
+void Connection::deliverClose (bool aborted) {
+ QPID_LOG(debug, cluster << " replicated close of " << *this);
+ if (connection.get()) {
+ if (aborted) connection->abort();
+ else connection->closed();
+ connection.reset();
+ }
cluster.erase(self);
}
// Close the connection
void Connection::close() {
+ QPID_LOG(debug, cluster << " local close of " << *this);
if (connection.get()) {
connection->closed();
- // Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
}
@@ -286,11 +293,9 @@ void Connection::close() {
// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
- connection->abort();
- // Ensure we delete the broker::Connection in the deliver thread.
- connection.reset();
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self);
}
- cluster.erase(self);
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 45d832a5ff..72a98c12f1 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -170,7 +170,7 @@ class Connection :
const std::string& initFrames);
void close();
void abort();
- void deliverClose();
+ void deliverClose(bool);
OutputInterceptor& getOutput() { return output; }
@@ -194,6 +194,7 @@ class Connection :
bool isLink;
uint64_t objectId;
bool shadow;
+ bool delayManagement;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
@@ -202,14 +203,19 @@ class Connection :
const qpid::sys::SecuritySettings& external_,
bool isLink_=false,
uint64_t objectId_=0,
- bool shadow_=false
+ bool shadow_=false,
+ bool delayManagement_=false
) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
- isLink(isLink_), objectId(objectId_), shadow(shadow_)
+ isLink(isLink_), objectId(objectId_), shadow(shadow_),
+ delayManagement(delayManagement_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
- new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
+ new broker::Connection(
+ out, broker, mgmtId, external, isLink, objectId,
+ shadow, delayManagement)
+ );
}
};
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index b1c27804db..8818a4c3ac 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -2321,6 +2321,23 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
}
}
+namespace {
+bool isNotDeleted(const ManagementObjectMap::value_type& value) {
+ return !value.second->isDeleted();
+}
+
+size_t countNotDeleted(const ManagementObjectMap& map) {
+ return std::count_if(map.begin(), map.end(), isNotDeleted);
+}
+
+void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
+ for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) {
+ if (!i->second->isDeleted())
+ o << endl << " " << i->second->getObjectId().getV2Key();
+ }
+}
+} // namespace
+
string ManagementAgent::debugSnapshot() {
ostringstream msg;
msg << " management snapshot:";
@@ -2328,8 +2345,8 @@ string ManagementAgent::debugSnapshot() {
i != remoteAgents.end(); ++i)
msg << " " << i->second->routingKey;
msg << " packages: " << packages.size();
- msg << " objects: " << managementObjects.size();
- msg << " new objects: " << newManagementObjects.size();
+ msg << " objects: " << countNotDeleted(managementObjects);
+ msg << " new objects: " << countNotDeleted(newManagementObjects);
return msg.str();
}
diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp
index e051591afd..6105fc96c7 100644
--- a/cpp/src/qpid/sys/ClusterSafe.cpp
+++ b/cpp/src/qpid/sys/ClusterSafe.cpp
@@ -43,8 +43,15 @@ void assertClusterSafe() {
}
}
-ClusterSafeScope::ClusterSafeScope() { inContext = true; }
-ClusterSafeScope::~ClusterSafeScope() { inContext = false; }
+ClusterSafeScope::ClusterSafeScope() {
+ assert(!inContext);
+ inContext = true;
+}
+
+ClusterSafeScope::~ClusterSafeScope() {
+ assert(inContext);
+ inContext = false;
+}
void enableClusterSafe() { inCluster = true; }