summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2012-05-21 23:18:50 +0000
committerAndrew Stitcher <astitcher@apache.org>2012-05-21 23:18:50 +0000
commita7cf88ae2d7d5619cf4cb8eead6e4c9a4e1d62b1 (patch)
treee96a620b29f274b38148e9f36d46f5c49c1cc1b8 /cpp/src/qpid/sys
parenta90bd1c8d3e204e55866a44c6cf2198bbab5de76 (diff)
downloadqpid-python-a7cf88ae2d7d5619cf4cb8eead6e4c9a4e1d62b1.tar.gz
QPID-2518: Qpid C++ broker can easily be blocked by client trying to connect over SSL port
Implement timed disconnect for TCP and for SSL/TCP mux git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1341262 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp34
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h9
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp20
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp21
4 files changed, 70 insertions, 14 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 5233002850..3edc896724 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
@@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() {
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+ // We've just got the protocol negotiation so we can cancel the timeout for that
+ timeoutTimerTask->cancel();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index b9867606c4..2ddd5c9a90 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -27,6 +27,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/CommonImportExport.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -38,6 +40,8 @@ namespace sys {
class AsynchIO;
struct AsynchIOBufferBase;
class Socket;
+class Timer;
+class TimerTask;
class AsynchIOHandler : public OutputControl {
std::string identifier;
@@ -49,13 +53,14 @@ class AsynchIOHandler : public OutputControl {
AtomicValue<int32_t> readCredit;
static const int32_t InfiniteCredit = -1;
Mutex creditLock;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 48baef9042..7cd5059570 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -39,6 +39,8 @@
namespace qpid {
namespace sys {
+class Timer;
+
using namespace qpid::sys::ssl;
struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public ProtocolFactory {
typedef SslAcceptorTmpl<T> SslAcceptor;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
const bool tcpNoDelay;
T listener;
const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public ProtocolFactory {
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin {
try {
ssl::initNSS(options, true);
nssInitialized = true;
-
+
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(options.multiplex ?
static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)) :
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)) :
static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)));
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)));
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP port " <<
@@ -156,7 +162,9 @@ static struct SslPlugin : public Plugin {
} sslPlugin;
template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket&
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index bd10a5555a..551440f954 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -36,14 +36,21 @@
namespace qpid {
namespace sys {
+class Timer;
+
class AsynchIOProtocolFactory : public ProtocolFactory {
- const bool tcpNoDelay;
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
uint16_t listeningPort;
+ const bool tcpNoDelay;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin {
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime,
shouldListen));
if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin {
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay)
{
if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}