summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-06-03 13:52:51 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-06-03 13:52:51 +0000
commiteaa1184eaa80a1095daf784260aa8b2434aa710a (patch)
treebbb17863d02201b998572bdc2cec6b2254a50c29 /qpid/cpp/src
parent0e4541fb7ce30f66025aa3371ad6887066f3f80d (diff)
downloadqpid-python-eaa1184eaa80a1095daf784260aa8b2434aa710a.tar.gz
Revert "QPID-1879 Don't use a thread for every new client Connection"
This reverts commit b54680d4b3341fa280a237a6d80952b9830ae3c5. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp92
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp96
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h33
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp61
-rw-r--r--qpid/cpp/src/qpid/client/SslConnector.cpp82
5 files changed, 181 insertions, 183 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index ccaa8c0b87..6639f92324 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,13 +18,7 @@
* under the License.
*
*/
-
#include "ConnectionImpl.h"
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
@@ -34,16 +28,11 @@
#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/Options.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/shared_ptr.hpp>
#include <limits>
-#include <vector>
namespace qpid {
namespace client {
@@ -53,10 +42,7 @@ using namespace qpid::framing::connection;
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-namespace {
-// Maybe should amalgamate the singletons into a single client singleton
-
-// Get timer singleton
+// Get timer singleton
Timer& theTimer() {
static Mutex timerInitLock;
ScopedLock<Mutex> l(timerInitLock);
@@ -65,73 +51,6 @@ Timer& theTimer() {
return t;
}
-struct IOThreadOptions : public qpid::Options {
- int maxIOThreads;
-
- IOThreadOptions(int c) :
- Options("IO threading options"),
- maxIOThreads(c)
- {
- addOptions()
- ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use");
- }
-};
-
-// IO threads
-class IOThread {
- int maxIOThreads;
- int ioThreads;
- int connections;
- Mutex threadLock;
- std::vector<Thread> t;
- Poller::shared_ptr poller_;
-
-public:
- void add() {
- ScopedLock<Mutex> l(threadLock);
- ++connections;
- if (ioThreads < maxIOThreads) {
- QPID_LOG(debug, "Created IO thread: " << ioThreads);
- ++ioThreads;
- t.push_back( Thread(poller_.get()) );
- }
- }
-
- void sub() {
- ScopedLock<Mutex> l(threadLock);
- --connections;
- }
-
- Poller::shared_ptr poller() const {
- return poller_;
- }
-
- // Here is where the maximum number of threads is set
- IOThread(int c) :
- ioThreads(0),
- connections(0),
- poller_(new Poller)
- {
- IOThreadOptions options(c);
- options.parse(0, 0, QPIDC_CONF_FILE, true);
- maxIOThreads = (options.maxIOThreads != -1) ?
- options.maxIOThreads : 1;
- }
-
- // We can't destroy threads one-by-one as the only
- // control we have is to shutdown the whole lot
- // and we can't do that before we're unloaded as we can't
- // restart the Poller after shutting it down
- ~IOThread() {
- poller_->shutdown();
- for (int i=0; i<ioThreads; ++i) {
- t[i].join();
- }
- }
-};
-
-static IOThread io(SystemInfo::concurrency());
-
class HeartbeatTask : public TimerTask {
TimeoutHandler& timeout;
@@ -148,8 +67,6 @@ public:
{}
};
-}
-
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -173,7 +90,6 @@ ConnectionImpl::~ConnectionImpl() {
// is running.
failover.reset();
if (connector) connector->close();
- io.sub();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
@@ -210,6 +126,7 @@ bool ConnectionImpl::isOpen() const
return handler.isOpen();
}
+
void ConnectionImpl::open()
{
const std::string& protocol = handler.protocol;
@@ -217,8 +134,7 @@ void ConnectionImpl::open()
int port = handler.port;
QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
- io.add();
- connector.reset(Connector::create(protocol, io.poller(), version, handler, this));
+ connector.reset(Connector::create(protocol, version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
connector->connect(host, port);
@@ -322,7 +238,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
{
return handler;
}
-
+
std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
}
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 946bf0138d..1558f292aa 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -27,11 +27,9 @@
#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
-#include "qpid/sys/Socket.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
@@ -53,23 +51,21 @@ using boost::str;
// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
namespace {
typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-
+
ProtocolRegistry& theProtocolRegistry() {
static ProtocolRegistry protocolRegistry;
-
+
return protocolRegistry;
}
}
-Connector* Connector::create(const std::string& proto,
- Poller::shared_ptr p,
- framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
{
ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
if (i==theProtocolRegistry().end()) {
throw Exception(QPID_MSG("Unknown protocol: " << proto));
}
- return (i->second)(p, v, s, c);
+ return (i->second)(v, s, c);
}
void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
@@ -85,7 +81,7 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
-class TCPConnector : public Connector, public sys::Codec
+class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
{
typedef std::deque<framing::AMQFrame> Frames;
struct Buff;
@@ -97,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
Bounds* bounds;
-
+
framing::ProtocolVersion version;
bool initiated;
bool closed;
@@ -108,25 +104,28 @@ class TCPConnector : public Connector, public sys::Codec
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
+ sys::Thread receiver;
+
sys::Socket socket;
sys::AsynchIO* aio;
std::string identifier;
- Poller::shared_ptr poller;
+ boost::shared_ptr<sys::Poller> poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~TCPConnector();
+ void run();
void handleClosed();
bool closeInternal();
-
+
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
boost::weak_ptr<ConnectionImpl> impl;
-
+
void connect(const std::string& host, int port);
void init();
void close();
@@ -143,23 +142,18 @@ class TCPConnector : public Connector, public sys::Codec
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool canEncode();
+
public:
- TCPConnector(Poller::shared_ptr,
- framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
+ TCPConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
ConnectionImpl*);
};
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
- Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(p, v, s, c);
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(v, s, c);
}
struct StaticInit {
@@ -169,21 +163,19 @@ namespace {
} init;
}
-TCPConnector::TCPConnector(Poller::shared_ptr p,
- ProtocolVersion ver,
+TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
lastEof(0),
currentSize(0),
bounds(cimpl),
- version(ver),
+ version(ver),
initiated(false),
closed(true),
joined(true),
shutdownHandler(0),
aio(0),
- poller(p),
impl(cimpl->shared_from_this())
{
QPID_LOG(debug, "TCPConnector created for " << version.toString());
@@ -205,6 +197,7 @@ void TCPConnector::connect(const std::string& host, int port){
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ poller = Poller::shared_ptr(new Poller);
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -221,24 +214,28 @@ void TCPConnector::init(){
ProtocolInitiation init(version);
writeDataBlock(init);
joined = false;
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- aio->start(poller);
+ receiver = Thread(this);
}
bool TCPConnector::closeInternal() {
+ bool ret;
+ {
Mutex::ScopedLock l(lock);
- bool ret = !closed;
+ ret = !closed;
if (!closed) {
closed = true;
aio->queueForDeletion();
- socket.close();
+ poller->shutdown();
+ }
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
+ }
+ joined = true;
}
+ receiver.join();
return ret;
}
-
+
void TCPConnector::close() {
closeInternal();
}
@@ -288,13 +285,18 @@ void TCPConnector::handleClosed() {
shutdownHandler->shutdown();
}
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
if (codec->canEncode()) {
std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
+
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
@@ -380,6 +382,28 @@ void TCPConnector::eof(AsynchIO&) {
handleClosed();
}
+void TCPConnector::run() {
+ // Keep the connection impl in memory until run() completes.
+ boost::shared_ptr<ConnectionImpl> protect = impl.lock();
+ assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+
+ aio->start(poller);
+ d.run();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
+ handleClosed();
+ }
+ try {
+ socket.close();
+ } catch (const std::exception&) {}
+}
+
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 880c81affe..78ddaa33cd 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -22,24 +22,27 @@
#define _Connector_
+#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
-
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Time.h"
+
+#include <queue>
+#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
-#include <string>
-
namespace qpid {
namespace sys {
-class ShutdownHandler;
class SecurityLayer;
-class Poller;
-}
-
-namespace framing {
-class InputHandler;
-class AMQFrame;
}
namespace client {
@@ -49,14 +52,11 @@ class ConnectionImpl;
///@internal
class Connector : public framing::OutputHandler
-{
+{
public:
// Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future)
- typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>,
- framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
- static Connector* create(const std::string& proto,
- boost::shared_ptr<qpid::sys::Poller>,
- framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
@@ -73,6 +73,7 @@ class Connector : public framing::OutputHandler
virtual const std::string& getIdentifier() const = 0;
virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
};
}}
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index f6bedf63f5..ad85104f3a 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -26,7 +26,6 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/rdma/RdmaIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
@@ -49,7 +48,7 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
- class RdmaConnector : public Connector, public sys::Codec
+ class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable
{
struct Buff;
@@ -61,12 +60,13 @@ using boost::str;
Frames frames;
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
- Bounds* bounds;
-
+ Bounds* bounds;
+
+
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex pollingLock;
+ sys::Mutex pollingLock;
bool polling;
bool joined;
@@ -75,12 +75,15 @@ using boost::str;
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
+ sys::Thread receiver;
+
Rdma::AsynchIO* aio;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
+ void run();
void handleClosed();
bool closeInternal();
@@ -98,7 +101,7 @@ using boost::str;
std::string identifier;
ConnectionImpl* impl;
-
+
void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
@@ -116,16 +119,15 @@ using boost::str;
bool canEncode();
public:
- RdmaConnector(Poller::shared_ptr,
- framing::ProtocolVersion pVersion,
+ RdmaConnector(framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
// Static constructor which registers connector here
namespace {
- Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new RdmaConnector(p, v, s, c);
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new RdmaConnector(v, s, c);
}
struct StaticInit {
@@ -137,8 +139,7 @@ namespace {
}
-RdmaConnector::RdmaConnector(Poller::shared_ptr p,
- ProtocolVersion ver,
+RdmaConnector::RdmaConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
@@ -151,7 +152,6 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
joined(true),
shutdownHandler(0),
aio(0),
- poller(p),
impl(cimpl)
{
QPID_LOG(debug, "RdmaConnector created for " << version);
@@ -165,6 +165,7 @@ void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
assert(joined);
+ poller = Poller::shared_ptr(new Poller);
// This stuff needs to abstracted out of here to a platform specific file
::addrinfo *res;
@@ -189,6 +190,7 @@ void RdmaConnector::connect(const std::string& host, int port){
polling = true;
joined = false;
+ receiver = Thread(this);
}
// The following only gets run when connected
@@ -224,14 +226,23 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv
bool RdmaConnector::closeInternal() {
bool ret;
+ {
Mutex::ScopedLock l(pollingLock);
ret = polling;
if (polling) {
polling = false;
+ poller->shutdown();
}
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
+ }
+ joined = true;
+ }
+
+ receiver.join();
return ret;
}
-
+
void RdmaConnector::close() {
closeInternal();
}
@@ -355,6 +366,28 @@ void RdmaConnector::eof(Rdma::AsynchIO&) {
handleClosed();
}
+void RdmaConnector::run(){
+ // Keep the connection impl in memory until run() completes.
+ //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
+ //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ //assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ //aio->start(poller);
+ d.run();
+ //aio->queueForDeletion();
+ } catch (const std::exception& e) {
+ {
+ // We're no longer polling
+ Mutex::ScopedLock l(pollingLock);
+ polling = false;
+ }
+ QPID_LOG(error, e.what());
+ handleClosed();
+ }
+}
+
void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp
index 8194371b8a..7b0bcc6f1e 100644
--- a/qpid/cpp/src/qpid/client/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/SslConnector.cpp
@@ -28,7 +28,6 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
@@ -51,7 +50,7 @@ using boost::format;
using boost::str;
-class SslConnector : public Connector
+class SslConnector : public Connector, private sys::Runnable
{
struct Buff;
@@ -69,25 +68,25 @@ class SslConnector : public Connector
framing::Buffer encode;
size_t framesEncoded;
std::string identifier;
- Bounds* bounds;
-
+ Bounds* bounds;
+
void writeOne();
void newBuffer();
public:
-
+
Writer(uint16_t maxFrameSize, Bounds*);
~Writer();
void init(std::string id, sys::ssl::SslIO*);
void handle(framing::AMQFrame&);
void write(sys::ssl::SslIO&);
};
-
+
const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex closedLock;
+ sys::Mutex closedLock;
bool closed;
bool joined;
@@ -97,17 +96,20 @@ class SslConnector : public Connector
framing::OutputHandler* output;
Writer writer;
+
+ sys::Thread receiver;
sys::ssl::SslSocket socket;
sys::ssl::SslIO* aio;
- Poller::shared_ptr poller;
+ boost::shared_ptr<sys::Poller> poller;
~SslConnector();
+ void run();
void handleClosed();
bool closeInternal();
-
+
void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
void writebuff(qpid::sys::ssl::SslIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
@@ -116,7 +118,7 @@ class SslConnector : public Connector
std::string identifier;
ConnectionImpl* impl;
-
+
void connect(const std::string& host, int port);
void init();
void close();
@@ -130,20 +132,15 @@ class SslConnector : public Connector
const std::string& getIdentifier() const;
public:
- SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
+ SslConnector(framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
-struct SslConnector::Buff : public SslIO::BufferBase {
- Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
- Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new SslConnector(p, v, s, c);
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new SslConnector(v, s, c);
}
struct StaticInit {
@@ -152,9 +149,9 @@ namespace {
SslOptions options;
options.parse (0, 0, QPIDC_CONF_FILE, true);
if (options.certDbPath.empty()) {
- QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
+ QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
} else {
- initNSS(options);
+ initNSS(options);
Connector::registerFactory("ssl", &create);
}
} catch (const std::exception& e) {
@@ -166,8 +163,7 @@ namespace {
} init;
}
-SslConnector::SslConnector(Poller::shared_ptr p,
- ProtocolVersion ver,
+SslConnector::SslConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
@@ -178,7 +174,6 @@ SslConnector::SslConnector(Poller::shared_ptr p,
shutdownHandler(0),
writer(maxFrameSize, cimpl),
aio(0),
- poller(p),
impl(cimpl)
{
QPID_LOG(debug, "SslConnector created for " << version.toString());
@@ -202,6 +197,7 @@ void SslConnector::connect(const std::string& host, int port){
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
+ poller = Poller::shared_ptr(new Poller);
aio = new SslIO(socket,
boost::bind(&SslConnector::readbuff, this, _1, _2),
boost::bind(&SslConnector::eof, this, _1),
@@ -218,10 +214,7 @@ void SslConnector::init(){
ProtocolInitiation init(version);
writeDataBlock(init);
joined = false;
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
- aio->start(poller);
+ receiver = Thread(this);
}
bool SslConnector::closeInternal() {
@@ -230,11 +223,16 @@ bool SslConnector::closeInternal() {
if (!closed) {
closed = true;
aio->queueForDeletion();
- socket.close();
+ poller->shutdown();
+ }
+ if (!joined && receiver.id() != Thread::current().id()) {
+ joined = true;
+ Mutex::ScopedUnlock u(closedLock);
+ receiver.join();
}
return ret;
}
-
+
void SslConnector::close() {
closeInternal();
}
@@ -268,6 +266,11 @@ void SslConnector::handleClosed() {
shutdownHandler->shutdown();
}
+struct SslConnector::Buff : public SslIO::BufferBase {
+ Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -372,4 +375,25 @@ void SslConnector::eof(SslIO&) {
handleClosed();
}
+void SslConnector::run(){
+ // Keep the connection impl in memory until run() completes.
+ boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+
+ aio->start(poller);
+ d.run();
+ socket.close();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
+ handleClosed();
+ }
+}
+
+
}} // namespace qpid::client