summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-02-01 21:26:00 +0000
committerAlan Conway <aconway@apache.org>2011-02-01 21:26:00 +0000
commit8cc4082337bd6ea1be0c9f96d3383314f7fc228b (patch)
tree30762327f33233deea60aa48260e84de332465fd
parent14842b8d1b62c3c70fb135287b9daa66b08c5b63 (diff)
downloadqpid-python-8cc4082337bd6ea1be0c9f96d3383314f7fc228b.tar.gz
QPID-3007: Unique management identifier for connections.
Management was using remote socket address (host:port) to identify connections, but this is not a unique identifier. Both the local and remote addresses are needed to uniquely identify a connection - see http://www.faqs.org/rfcs/rfc793.html. This was causing management errors (multiple objects using same identifier) and cluster failures (invalid-arg exception) due to inconsistencies caused by the incorrect management map. This commit uses "localhost:localport-remotehost:remoteport" as a unique identifier. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1066220 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp11
-rw-r--r--cpp/src/qpid/broker/windows/SslProtocolFactory.cpp2
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp2
-rw-r--r--cpp/src/qpid/sys/Socket.h19
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp2
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp2
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h1
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.h5
8 files changed, 32 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 82f1f0ea24..7b1c75db74 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -255,8 +255,17 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& key)
+Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
{
+ // Convert keyOrMgmtId to a host:port key.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses
+ // connection management IDs. Currently sys:: protocol factories
+ // and IO plugins construct the IDs and LinkRegistry parses them.
+ size_t separator = keyOrMgmtId.find('-');
+ if (separator == std::string::npos) separator = 0;
+ std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+
Mutex::ScopedLock locker(lock);
LinkMap::iterator l = links.find(key);
if (l != links.end()) return l->second;
diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index 2de4a4d914..fd0e537192 100644
--- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -194,7 +194,7 @@ void SslProtocolFactory::established(sys::Poller::shared_ptr poller,
const qpid::sys::Socket& s,
sys::ConnectionCodec::Factory* f,
bool isClient) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getPeerAddress(), f);
+ sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f);
if (tcpNoDelay) {
s.setTcpNoDelay();
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 9eb2eb7b5d..d53db20598 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -84,7 +84,7 @@ class RdmaIOHandler : public OutputControl {
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getPeerName()),
+ identifier(c->getFullName()),
factory(f),
codec(0),
readError(false),
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index 7b50c42a3c..855a8029cc 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,27 +60,32 @@ public:
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
- /** Returns the "socket name" ie the address bound to
+ /** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
QPID_COMMON_EXTERN std::string getSockname() const;
- /** Returns the "peer name" ie the address bound to
+ /** Returns the "peer name" ie the address bound to
* the remote end of the socket
*/
std::string getPeername() const;
- /**
+ /**
* Returns an address (host and port) for the remote end of the
* socket
*/
QPID_COMMON_EXTERN std::string getPeerAddress() const;
- /**
+ /**
* Returns an address (host and port) for the local end of the
* socket
*/
std::string getLocalAddress() const;
+ /**
+ * Returns the full address of the connection: local and remote host and port.
+ */
+ std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+
QPID_COMMON_EXTERN uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
@@ -95,7 +100,7 @@ public:
*/
QPID_COMMON_EXTERN Socket* accept() const;
- // TODO The following are raw operations, maybe they need better wrapping?
+ // TODO The following are raw operations, maybe they need better wrapping?
QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 297787f497..b0e791d60b 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -121,7 +121,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int back
void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
ConnectionCodec::Factory* f, bool isClient) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f, nodict);
+ qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
s.setTcpNoDelay(tcpNoDelay);
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index d0c7fe4caa..a6528f9ad9 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -81,7 +81,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
if (tcpNoDelay) {
s.setTcpNoDelay();
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 59d87822d4..28bddd2165 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -274,6 +274,7 @@ namespace Rdma {
QueuePair::intrusive_ptr getQueuePair();
std::string getLocalName() const;
std::string getPeerName() const;
+ std::string getFullName() const { return getLocalName()+"-"+getPeerName(); }
};
}
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h
index e2443e31c8..102e56986f 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -91,6 +91,11 @@ public:
*/
std::string getLocalAddress() const;
+ /**
+ * Returns the full address of the connection: local and remote host and port.
+ */
+ std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+
uint16_t getLocalPort() const;
uint16_t getRemotePort() const;