diff options
author | Andrew Stitcher <astitcher@apache.org> | 2013-01-08 22:17:55 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2013-01-08 22:17:55 +0000 |
commit | 9a2b1d265d0816b2348ed9797ef5a3b2604a4d7e (patch) | |
tree | 9ea0dffd78d8d3ce6f80216250b1fb4a9930ea14 /qpid/cpp/src | |
parent | 390e9696e0901c5aebbfbab86771ec51bf112450 (diff) | |
download | qpid-python-9a2b1d265d0816b2348ed9797ef5a3b2604a4d7e.tar.gz |
QPID-4315: Changed Connection management name to be supplied by Link
code on outgoing connections so that the Link code can correlate the
connection with the Link using the name.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1430573 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ProtocolFactory.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/SslPlugin.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 33 |
11 files changed, 61 insertions, 86 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index fd49b1414f..8ec98e4fbe 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1015,11 +1015,12 @@ void Broker::accept() { } void Broker::connect( + const std::string& name, const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed) { boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport); - if (pf) pf->connect(poller, host, port, factory.get(), failed); + if (pf) pf->connect(poller, name, host, port, factory.get(), failed); else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index bae67ee071..842d206795 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -233,7 +233,8 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, const std::string& port, + void connect(const std::string& name, + const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 80c3f99176..66cb1ebf3a 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -228,7 +228,7 @@ void Link::startConnectionLH () // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. setStateLH(STATE_CONNECTING); - broker->connect (host, boost::lexical_cast<std::string>(port), transport, + broker->connect (name, host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port); } catch(const std::exception& e) { @@ -774,14 +774,6 @@ std::string Link::createName(const std::string& transport, return linkName.str(); } - -bool Link::pendingConnection(const std::string& _host, uint16_t _port) const -{ - Mutex::ScopedLock mutex(lock); - return (isConnecting() && _port == port && _host == host); -} - - const std::string Link::exchangeTypeName("qpid.LinkExchange"); }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 3cca4e1bb3..0c2d412690 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -115,7 +115,6 @@ class Link : public PersistableConfig, public management::Manageable { void closed(int, std::string); // Called when connection goes away void notifyConnectionForced(const std::string text); void closeConnection(const std::string& reason); - bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote? friend class LinkRegistry; // to call established, opened, closed diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index b1eb3bb41a..a162bff229 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -272,38 +272,6 @@ MessageStore* LinkRegistry::getStore() const { return store; } -namespace { - void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) - { - // Extract host and port of remote broker from connection id string. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses connection - // management IDs. Currently sys:: protocol factories and IO plugins construct the - // IDs and LinkRegistry parses them. - // KAG: current connection id format assumed: - // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are - // contained within brackets "[...]", example: - // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us - // if this assumption changes! - size_t separator = connId.find('-'); - assert(separator != std::string::npos); - std::string remote = connId.substr(separator+1, std::string::npos); - separator = remote.rfind(":"); - assert(separator != std::string::npos); - *host = remote.substr(0, separator); - // IPv6 - host is bracketed by "[]", strip them - if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { - *host = host->substr(1, host->length() - 2); - } - try { - *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); - } catch (const boost::bad_lexical_cast&) { - QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); - assert(false); - } - } -} - /** find the Link that corresponds to the given connection */ Link::shared_ptr LinkRegistry::findLink(const std::string& connId) { @@ -323,19 +291,15 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) // create a mapping from connection id to link QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); std::string host; - uint16_t port = 0; - extractHostPort( key, &host, &port ); Link::shared_ptr link; { Mutex::ScopedLock locker(lock); - for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) { - if (l->second->pendingConnection(host, port)) { - link = l->second; - pendingLinks.erase(l); - connections[key] = link->getName(); - QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); - break; - } + LinkMap::iterator l = pendingLinks.find(key); + if (l != pendingLinks.end()) { + link = l->second; + pendingLinks.erase(l); + connections[key] = link->getName(); + QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); } } diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 99e745c698..0225b11d27 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys::TimerTask { } }; -AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool nodict0) : +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) : identifier(id), aio(0), factory(f), codec(0), reads(0), readError(false), - isClient(false), + isClient(isClient0), nodict(nodict0), readCredit(InfiniteCredit) {} diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 6e70606a04..91ddb022af 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputControl { void write(const framing::ProtocolInitiation&); public: - QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool nodict); + QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict); QPID_COMMON_EXTERN ~AsynchIOHandler(); QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime); - QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } - // Output side QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); diff --git a/qpid/cpp/src/qpid/sys/ProtocolFactory.h b/qpid/cpp/src/qpid/sys/ProtocolFactory.h index ed573bd2f6..236398c111 100644 --- a/qpid/cpp/src/qpid/sys/ProtocolFactory.h +++ b/qpid/cpp/src/qpid/sys/ProtocolFactory.h @@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( boost::shared_ptr<Poller>, + const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory* codec, ConnectFailedCallback failed) = 0; diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index b491d28d0a..e1f4362d64 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/log/Statement.h" #include "qpid/sys/rdma/RdmaIO.h" @@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputControl { }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) : - identifier(c->getFullName()), + identifier(broker::QPID_NAME_PREFIX+c->getFullName()), factory(f), codec(0), readError(false), @@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory { public: RdmaIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); + void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; @@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, + const std::string& /*name*/, const std::string& host, const std::string& port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index 0638b55db6..a40da24eb8 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/log/Statement.h" #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" @@ -76,15 +77,16 @@ class SslProtocolFactory : public ProtocolFactory { SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient); + void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); + void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; @@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts } } +void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f) { + AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); + establishedCommon(async, poller, s); +} -void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - - AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict); +void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, const std::string& name) { + AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); + establishedCommon(async, poller, s); +} +void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { if (tcpNoDelay) { s.setTcpNoDelay(); QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) { - async->setClient(); - } - AsynchIO* aio = AsynchIO::create( s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), @@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::shared_ptr poller, for (unsigned i = 0; i<listeners.size(); ++i) { acceptors.push_back( AsynchAcceptor::create(listeners[i], - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact))); acceptors[i].start(poller); } } @@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed( void SslProtocolFactory::connect( Poller::shared_ptr poller, + const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) @@ -289,8 +295,8 @@ void SslProtocolFactory::connect( *socket, host, port, - boost::bind(&SslProtocolFactory::established, - this, poller, _1, fact, true), + boost::bind(&SslProtocolFactory::establishedOutgoing, + this, poller, _1, fact, name), boost::bind(&SslProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index da0bd31caa..1ef8708cd0 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/log/Statement.h" #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" @@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public ProtocolFactory { public: AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& name, + const std::string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient); + void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); + void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; @@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Opt } } -void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false); +void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f) { + AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); + establishedCommon(async, poller, s); +} + +void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, const std::string& name) { + AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); + establishedCommon(async, poller, s); +} +void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { if (tcpNoDelay) { s.setTcpNoDelay(); QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) - async->setClient(); AsynchIO* aio = AsynchIO::create (s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), @@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, for (unsigned i = 0; i<listeners.size(); ++i) { acceptors.push_back( AsynchAcceptor::create(listeners[i], - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact))); acceptors[i].start(poller); } } @@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, + const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) @@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect( *socket, host, port, - boost::bind(&AsynchIOProtocolFactory::established, - this, poller, _1, fact, true), + boost::bind(&AsynchIOProtocolFactory::establishedOutgoing, + this, poller, _1, fact, name), boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); |