summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/TCPIOPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp80
1 files changed, 25 insertions, 55 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 85d8c1db87..a6528f9ad9 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,31 +25,31 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- uint16_t listeningPort;
+ Socket listener;
+ const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
+ std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -61,49 +61,23 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(
- "", boost::lexical_cast<std::string>(opts.port),
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
- broker->registerProtocolFactory("tcp", protocolt);
+ ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
+ opts.tcpNoDelay));
+ QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
+ broker->registerProtocolFactory("tcp", protocol);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay)
-{
- SocketAddress sa(host, port);
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- }
-
-}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
+ tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
+{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -133,14 +107,16 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
+std::string AsynchIOProtocolFactory::getHost() const {
+ return listener.getSockname();
+}
+
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptors[i].start(poller);
- }
+ acceptor.reset(
+ AsynchAcceptor::create(listener,
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptor->start(poller);
}
void AsynchIOProtocolFactory::connectFailed(
@@ -154,7 +130,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, const std::string& port,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -163,8 +139,8 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
+
Socket* socket = new Socket();
- try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -174,12 +150,6 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
}
}} // namespace qpid::sys