summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
commit868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch)
tree63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/client
parent2e5ff8f1b328831043e6d7e323249d62187234c6 (diff)
downloadqpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp224
-rw-r--r--cpp/src/qpid/client/SubscriptionManagerImpl.cpp10
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp23
-rw-r--r--cpp/src/qpid/client/TCPConnector.h1
4 files changed, 127 insertions, 131 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index 4c6fadd28a..c2081a88f2 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -38,7 +38,6 @@
#include "qpid/Msg.h"
#include <iostream>
-#include <map>
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -54,53 +53,29 @@ using boost::str;
class SslConnector : public Connector
{
- struct Buff;
-
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef sys::ssl::SslIOBufferBase BufferBase;
- typedef std::vector<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- sys::ssl::SslIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
-
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, sys::ssl::SslIO*);
- void handle(framing::AMQFrame&);
- void write(sys::ssl::SslIO&);
- };
+ typedef std::deque<framing::AMQFrame> Frames;
const uint16_t maxFrameSize;
+
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
framing::ProtocolVersion version;
bool initiated;
- SecuritySettings securitySettings;
-
- sys::Mutex closedLock;
bool closed;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- Writer writer;
-
sys::ssl::SslSocket socket;
sys::ssl::SslIO* aio;
+ std::string identifier;
Poller::shared_ptr poller;
+ SecuritySettings securitySettings;
~SslConnector();
@@ -110,10 +85,7 @@ class SslConnector : public Connector
void eof(qpid::sys::ssl::SslIO&);
void disconnected(qpid::sys::ssl::SslIO&);
- std::string identifier;
-
void connect(const std::string& host, const std::string& port);
- void init();
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -126,17 +98,16 @@ class SslConnector : public Connector
const SecuritySettings* getSecuritySettings();
void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
public:
SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
-struct SslConnector::Buff : public SslIO::BufferBase {
- Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::shared_ptr p,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
closed(true),
shutdownHandler(0),
input(0),
- writer(maxFrameSize, cimpl),
aio(0),
poller(p)
{
@@ -192,7 +165,7 @@ SslConnector::~SslConnector() {
}
void SslConnector::connect(const std::string& host, const std::string& port){
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
assert(closed);
try {
socket.connect(host, port);
@@ -201,7 +174,6 @@ void SslConnector::connect(const std::string& host, const std::string& port){
throw TransportFailure(e.what());
}
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
aio = new SslIO(socket,
boost::bind(&SslConnector::readbuff, this, _1, _2),
@@ -210,21 +182,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){
boost::bind(&SslConnector::socketClosed, this, _1, _2),
0, // nobuffs
boost::bind(&SslConnector::writebuff, this, _1));
- writer.init(identifier, aio);
-}
-void SslConnector::init(){
- Mutex::ScopedLock l(closedLock);
+ aio->createBuffers(maxFrameSize);
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
ProtocolInitiation init(version);
writeDataBlock(init);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
aio->start(poller);
}
void SslConnector::close() {
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
if (!closed) {
closed = true;
if (aio)
@@ -260,76 +227,110 @@ const std::string& SslConnector::getIdentifier() const {
}
void SslConnector::send(AMQFrame& frame) {
- writer.handle(frame);
-}
-
-SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
-{
-}
-
-SslConnector::Writer::~Writer() { delete buffer; }
-
-void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- newBuffer();
-}
-void SslConnector::Writer::handle(framing::AMQFrame& frame) {
+ bool notifyWrite = false;
+ {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
- if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
lastEof = frames.size();
- aio->notifyPendingWrite();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ /*
+ NOTE: Moving the following line into this mutex block
+ is a workaround for BZ 570168, in which the test
+ testConcurrentSenders causes a hang about 1.5%
+ of the time. ( To see the hang much more frequently
+ leave this line out of the mutex block, and put a
+ small usleep just before it.)
+
+ TODO mgoulish - fix the underlying cause and then
+ move this call back outside the mutex.
+ */
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
}
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
}
-void SslConnector::Writer::writeOne() {
- assert(buffer);
- framesEncoded = 0;
+void SslConnector::writebuff(SslIO& /*aio*/)
+{
+ // It's possible to be disconnected and be writable
+ if (closed)
+ return;
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
-}
+ if (!canEncode()) {
+ return;
+ }
-void SslConnector::Writer::newBuffer() {
- buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff(maxFrameSize);
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+ SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
+
+ size_t encoded = encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer);
+ }
}
// Called in IO thread.
-void SslConnector::Writer::write(sys::ssl::SslIO&) {
+bool SslConnector::canEncode()
+{
Mutex::ScopedLock l(lock);
- assert(buffer);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t SslConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
size_t bytesWritten(0);
- for (size_t i = 0; i < lastEof; ++i) {
- AMQFrame& frame = frames[i];
- uint32_t size = frame.encodedSize();
- if (size > encode.available()) writeOne();
- assert(size <= encode.available());
- frame.encode(encode);
- ++framesEncoded;
- bytesWritten += size;
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
- frames.erase(frames.begin(), frames.begin()+lastEof);
- lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne();
+ return bytesWritten;
}
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+{
+ int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+size_t SslConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
- //TODO: check the version is correct
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
}
initiated = true;
}
@@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
input->received(frame);
}
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
-}
-
-void SslConnector::writebuff(SslIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void SslConnector::writeDataBlock(const AMQDataBlock& data) {
- SslIO::BufferBase* buff = new Buff(maxFrameSize);
+ SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
index 7dead112e5..44f81776c0 100644
--- a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
@@ -28,6 +28,7 @@
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/framing/Uuid.h>
+#include <qpid/log/Statement.h>
#include <set>
#include <sstream>
@@ -167,6 +168,15 @@ void SubscriptionManagerImpl::setFlowControl(const std::string& name, uint32_t m
setFlowControl(name, FlowControl(messages, bytes, window));
}
+AutoCancel::AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+AutoCancel::~AutoCancel() {
+ try {
+ sm.cancel(tag);
+ } catch (const qpid::Exception& e) {
+ QPID_LOG(info, "Exception in AutoCancel destructor: " << e.what());
+ }
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index 1dd951d339..a5c6465bad 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -46,11 +46,6 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -118,9 +113,8 @@ void TCPConnector::connected(const Socket&) {
void TCPConnector::start(sys::AsynchIO* aio_) {
aio = aio_;
- for (int i = 0; i < 4; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
+
+ aio->createBuffers(maxFrameSize);
identifier = str(format("[%1%]") % socket.getFullAddress());
}
@@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /*aio*/)
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- if (codec->canEncode()) {
- std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
- if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ if (!codec->canEncode()) {
+ return;
+ }
+
+ AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
buffer->dataCount = encoded;
- aio->queueWrite(buffer.release());
+ aio->queueWrite(buffer);
}
}
@@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char* buffer, size_t size)
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index c87d544816..c0bc26028d 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -50,7 +50,6 @@ namespace client {
class TCPConnector : public Connector, public sys::Codec
{
typedef std::deque<framing::AMQFrame> Frames;
- struct Buff;
const uint16_t maxFrameSize;