summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/Connector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp82
1 files changed, 47 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index f69032b26d..fbb571d40a 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -51,10 +51,10 @@ using boost::str;
// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
namespace {
typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-
+
ProtocolRegistry& theProtocolRegistry() {
static ProtocolRegistry protocolRegistry;
-
+
return protocolRegistry;
}
}
@@ -93,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
Bounds* bounds;
-
+
framing::ProtocolVersion version;
bool initiated;
bool closed;
@@ -118,16 +118,17 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
void run();
void handleClosed();
bool closeInternal();
-
+
+ void connected(const Socket&);
+ void connectFailed(const std::string& msg);
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
boost::weak_ptr<ConnectionImpl> impl;
-
+
void connect(const std::string& host, int port);
- void init();
void close();
void send(framing::AMQFrame& frame);
void abort();
@@ -142,7 +143,6 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool canEncode();
-
public:
TCPConnector(framing::ProtocolVersion pVersion,
@@ -163,6 +163,11 @@ namespace {
} init;
}
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
@@ -189,15 +194,19 @@ TCPConnector::~TCPConnector() {
void TCPConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(lock);
assert(closed);
- try {
- socket.connect(host, port);
- } catch (const std::exception& /*e*/) {
- socket.close();
- throw;
- }
-
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ assert(joined);
poller = Poller::shared_ptr(new Poller);
+ AsynchConnector::create(socket,
+ poller,
+ host, port,
+ boost::bind(&TCPConnector::connected, this, _1),
+ boost::bind(&TCPConnector::connectFailed, this, _3));
+ closed = false;
+ joined = false;
+ receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -205,16 +214,23 @@ void TCPConnector::connect(const std::string& host, int port){
0, // closed
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
- closed = false;
-}
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+ aio->start(poller);
-void TCPConnector::init(){
- Mutex::ScopedLock l(lock);
- assert(joined);
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
ProtocolInitiation init(version);
writeDataBlock(init);
- joined = false;
- receiver = Thread(this);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+ QPID_LOG(warning, "Connecting failed: " << msg);
+ closed = true;
+ poller->shutdown();
+ closeInternal();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
}
bool TCPConnector::closeInternal() {
@@ -235,7 +251,7 @@ bool TCPConnector::closeInternal() {
receiver.join();
return ret;
}
-
+
void TCPConnector::close() {
closeInternal();
}
@@ -243,7 +259,13 @@ void TCPConnector::close() {
void TCPConnector::abort() {
// Can't abort a closed connection
if (!closed) {
- aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ } else {
+ // We're still connecting
+ connectFailed("Connection timedout");
+ }
}
}
@@ -288,18 +310,13 @@ void TCPConnector::handleClosed() {
shutdownHandler->shutdown();
}
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
if (codec->canEncode()) {
std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
+
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
@@ -395,11 +412,6 @@ void TCPConnector::run() {
try {
Dispatcher d(poller);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- aio->start(poller);
d.run();
} catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));