summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp55
1 files changed, 44 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 34338ce434..85d8c1db87 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,21 +25,22 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
@@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin {
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
-{}
+ tcpNoDelay(nodelay)
+{
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
@@ -138,6 +164,7 @@ void AsynchIOProtocolFactory::connect(
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
Socket* socket = new Socket();
+ try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -147,6 +174,12 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys