summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-01-08 22:17:55 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-01-08 22:17:55 +0000
commit9a2b1d265d0816b2348ed9797ef5a3b2604a4d7e (patch)
tree9ea0dffd78d8d3ce6f80216250b1fb4a9930ea14 /qpid/cpp/src
parent390e9696e0901c5aebbfbab86771ec51bf112450 (diff)
downloadqpid-python-9a2b1d265d0816b2348ed9797ef5a3b2604a4d7e.tar.gz
QPID-4315: Changed Connection management name to be supplied by Link
code on outgoing connections so that the Link code can correlate the connection with the Link using the name. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1430573 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h1
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp48
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp4
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.h4
-rw-r--r--qpid/cpp/src/qpid/sys/ProtocolFactory.h1
-rw-r--r--qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/SslPlugin.cpp34
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp33
11 files changed, 61 insertions, 86 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index fd49b1414f..8ec98e4fbe 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1015,11 +1015,12 @@ void Broker::accept() {
}
void Broker::connect(
+ const std::string& name,
const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed)
{
boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
- if (pf) pf->connect(poller, host, port, factory.get(), failed);
+ if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index bae67ee071..842d206795 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -233,7 +233,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, const std::string& port,
+ void connect(const std::string& name,
+ const std::string& host, const std::string& port,
const std::string& transport,
boost::function2<void, int, std::string> failed);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 80c3f99176..66cb1ebf3a 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -228,7 +228,7 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, boost::lexical_cast<std::string>(port), transport,
+ broker->connect (name, host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port);
} catch(const std::exception& e) {
@@ -774,14 +774,6 @@ std::string Link::createName(const std::string& transport,
return linkName.str();
}
-
-bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
-{
- Mutex::ScopedLock mutex(lock);
- return (isConnecting() && _port == port && _host == host);
-}
-
-
const std::string Link::exchangeTypeName("qpid.LinkExchange");
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 3cca4e1bb3..0c2d412690 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -115,7 +115,6 @@ class Link : public PersistableConfig, public management::Manageable {
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
- bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index b1eb3bb41a..a162bff229 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -272,38 +272,6 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-namespace {
- void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
- {
- // Extract host and port of remote broker from connection id string.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses connection
- // management IDs. Currently sys:: protocol factories and IO plugins construct the
- // IDs and LinkRegistry parses them.
- // KAG: current connection id format assumed:
- // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
- // contained within brackets "[...]", example:
- // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
- // if this assumption changes!
- size_t separator = connId.find('-');
- assert(separator != std::string::npos);
- std::string remote = connId.substr(separator+1, std::string::npos);
- separator = remote.rfind(":");
- assert(separator != std::string::npos);
- *host = remote.substr(0, separator);
- // IPv6 - host is bracketed by "[]", strip them
- if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
- *host = host->substr(1, host->length() - 2);
- }
- try {
- *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
- } catch (const boost::bad_lexical_cast&) {
- QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
- assert(false);
- }
- }
-}
-
/** find the Link that corresponds to the given connection */
Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
{
@@ -323,19 +291,15 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
// create a mapping from connection id to link
QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
std::string host;
- uint16_t port = 0;
- extractHostPort( key, &host, &port );
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) {
- if (l->second->pendingConnection(host, port)) {
- link = l->second;
- pendingLinks.erase(l);
- connections[key] = link->getName();
- QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
- break;
- }
+ LinkMap::iterator l = pendingLinks.find(key);
+ if (l != pendingLinks.end()) {
+ link = l->second;
+ pendingLinks.erase(l);
+ connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
}
}
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 99e745c698..0225b11d27 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys::TimerTask {
}
};
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool nodict0) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
- isClient(false),
+ isClient(isClient0),
nodict(nodict0),
readCredit(InfiniteCredit)
{}
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
index 6e70606a04..91ddb022af 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputControl {
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool nodict);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
- QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
// Output side
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
diff --git a/qpid/cpp/src/qpid/sys/ProtocolFactory.h b/qpid/cpp/src/qpid/sys/ProtocolFactory.h
index ed573bd2f6..236398c111 100644
--- a/qpid/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/qpid/cpp/src/qpid/sys/ProtocolFactory.h
@@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index b491d28d0a..e1f4362d64 100644
--- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
@@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputControl {
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getFullName()),
+ identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
factory(f),
codec(0),
readError(false),
@@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory {
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
@@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& /*name*/,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
index 0638b55db6..a40da24eb8 100644
--- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -76,15 +77,16 @@ class SslProtocolFactory : public ProtocolFactory {
SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
@@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts
}
}
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
-
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient) {
- async->setClient();
- }
-
AsynchIO* aio = AsynchIO::create(
s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::shared_ptr poller,
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed(
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -289,8 +295,8 @@ void SslProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&SslProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&SslProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index da0bd31caa..1ef8708cd0 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
public:
AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
@@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Opt
}
}
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
+
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
- async->setClient();
AsynchIO* aio = AsynchIO::create
(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&AsynchIOProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);