summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-25 18:48:18 +0000
committerAlan Conway <aconway@apache.org>2012-01-25 18:48:18 +0000
commit4208d1a90f004a5ab72463e9c6d9f97a3a17d16e (patch)
treef8cc1814d61d7ed2d170b45349a2be6c7d73727f
parent54d9e7a6228bb1bd291a9aa4c156126359a6dbf0 (diff)
downloadqpid-python-4208d1a90f004a5ab72463e9c6d9f97a3a17d16e.tar.gz
QPID-3603: Refactor LinkRegistry to use a ConnectionObserver.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235868 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObserver.h15
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h22
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h4
6 files changed, 45 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 5d4cc99e66..fbe653bf5d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -179,6 +179,7 @@ public:
std::auto_ptr<MessageStore> store;
AclModule* acl;
DataDir dataDir;
+ ConnectionObservers connectionObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -201,7 +202,6 @@ public:
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
- ConnectionObservers connectionObservers;
public:
virtual ~Broker();
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 97e8e8ca13..1e6aab217c 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -104,8 +104,7 @@ Connection::Connection(ConnectionOutputHandler* out_,
outboundTracker(*this)
{
outboundTracker.wrap(out);
- if (link)
- links.notifyConnection(mgmtId, this);
+ broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
@@ -143,8 +142,7 @@ Connection::~Connection()
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
}
- if (link)
- links.notifyClosed(mgmtId);
+ broker.getConnectionObservers().closed(*this);
if (heartbeatTimer)
heartbeatTimer->cancel();
@@ -165,8 +163,7 @@ void Connection::received(framing::AMQFrame& frame) {
recordFromClient(frame);
if (!wasOpen && isOpen()) {
doIoCallbacks(); // Do any callbacks registered before we opened.
- // FIXME aconway 2012-01-18: generic observer points.
- broker.getConnectionObservers().connect(*this);
+ broker.getConnectionObservers().opened(*this);
}
}
@@ -267,8 +264,7 @@ string Connection::getAuthCredentials()
void Connection::notifyConnectionForced(const string& text)
{
- if (link)
- links.notifyConnectionForced(mgmtId, text);
+ broker.getConnectionObservers().forced(*this, text);
}
void Connection::setUserId(const string& userId)
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObserver.h b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
index e59ec261bc..12aa8549fd 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObserver.h
@@ -37,14 +37,19 @@ class ConnectionObserver
public:
virtual ~ConnectionObserver() {}
- /** Called when a connection is opened and authentication has been
- * performed.
+ /** Called when a connection is first established. */
+ virtual void connection(Connection&) {}
+
+ /** Called when the opening negotiation is done and the connection is authenticated.
* @exception Throwing an exception will abort the connection.
*/
- virtual void connect(Connection& connection) = 0;
+ virtual void opened(Connection&) {}
+
+ /** Called when a connection is closed. */
+ virtual void closed(Connection&) {}
- /** Called when a connection is torn down. */
- virtual void disconnect(Connection& connection) = 0;
+ /** Called when a connection is forced closed. */
+ virtual void forced(Connection&, const std::string& /*message*/) {}
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
index bef40e26b4..b36097cdbb 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -38,17 +38,29 @@ class ConnectionObservers : public ConnectionObserver {
observers.push_back(observer);
}
- // implementation of ConnectionObserver interface
- void connect(Connection& c) {
- std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::connect, _1, boost::ref(c)));
+ void connection(Connection& c) {
+ each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
}
- void disconnect(Connection& c) {
- std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c)));
+
+ void opened(Connection& c) {
+ each(boost::bind(&ConnectionObserver::opened, _1, boost::ref(c)));
+ }
+
+ void closed(Connection& c) {
+ each(boost::bind(&ConnectionObserver::closed, _1, boost::ref(c)));
+ }
+
+ void forced(Connection& c, const std::string& text) {
+ each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
}
private:
typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
Observers observers;
+
+ template <class F> void each(F f) {
+ std::for_each(observers.begin(), observers.end(), f);
+ }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 31b4f1b490..1749440d7b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -48,6 +48,16 @@ LinkRegistry::LinkRegistry () :
{
}
+namespace {
+struct ConnectionObserverImpl : public ConnectionObserver {
+ LinkRegistry& links;
+ ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+ void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
+ void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
+ void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+};
+}
+
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker), timer(&broker->getTimer()),
maintenanceTask(new Periodic(*this)),
@@ -55,6 +65,8 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
realm(broker->getOptions().realm)
{
timer->add(maintenanceTask);
+ broker->getConnectionObservers().add(
+ boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
}
LinkRegistry::~LinkRegistry()
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index 4a878c81c3..2ea75efffd 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -42,7 +42,7 @@ class ConnectionExcluder : public broker::ConnectionObserver
ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
: adminUser(adminUser_), isPrimary(isPrimary_) {}
- void connect(broker::Connection& connection) {
+ void opened(broker::Connection& connection) {
if (!isPrimary() && !connection.isLink()
&& !connection.isAuthenticatedUser(adminUser))
{
@@ -59,8 +59,6 @@ class ConnectionExcluder : public broker::ConnectionObserver
}
}
- void disconnect(broker::Connection&) {}
-
private:
string adminUser;
PrimaryTest isPrimary;