summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/TCPIOPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp136
1 files changed, 86 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index ed7cc3748d..1ef8708cd0 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -20,15 +20,17 @@
*/
#include "qpid/sys/ProtocolFactory.h"
-#include "qpid/sys/AsynchIOHandler.h"
-#include "qpid/sys/AsynchIO.h"
#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Poller.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -47,20 +49,19 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port,
- int backlog, bool nodelay,
- Timer& timer, uint32_t maxTime,
- bool shouldListen);
+ AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
@@ -93,14 +94,9 @@ static class TCPIOPlugin : public Plugin {
bool shouldListen = !sslMultiplexEnabled();
ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(
- "", boost::lexical_cast<std::string>(opts.port),
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime,
- shouldListen));
-
- if (shouldListen) {
+ new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen));
+
+ if (shouldListen && protocolt->getPort()!=0 ) {
QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
}
@@ -109,54 +105,93 @@ static class TCPIOPlugin : public Plugin {
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port,
- int backlog, bool nodelay,
- Timer& timer, uint32_t maxTime,
- bool shouldListen) :
+namespace {
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) :
brokerTimer(timer),
- maxNegotiateTime(maxTime),
- tcpNoDelay(nodelay)
+ maxNegotiateTime(opts.maxNegotiateTime),
+ tcpNoDelay(opts.tcpNoDelay)
{
if (!shouldListen) {
- listeningPort = boost::lexical_cast<uint16_t>(port);
+ listeningPort = boost::lexical_cast<uint16_t>(opts.port);
return;
}
- SocketAddress sa(host, port);
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
+ std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+ listeningPort = 0;
+ }
- listeningPort = lport;
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port));
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
+ // We must have at least one resolved address
QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
+ Socket* s = createSocket();
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
QPID_LOG(debug, "Listened to: " << lport);
listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = createSocket();
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
}
+}
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
}
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
- async->setClient();
AsynchIO* aio = AsynchIO::create
(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -179,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -195,6 +230,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -204,14 +240,14 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
- Socket* socket = new Socket();
+ Socket* socket = createSocket();
try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
port,
- boost::bind(&AsynchIOProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);