summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-08-31 18:20:29 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-08-31 18:20:29 +0000
commit0bc9a47a7c35f8cf67ef0e92cc53c91e66a6deec (patch)
treeca13237c15fbfc83e460cb5c5685d3dfd4dcbc1f /qpid/cpp
parentf9236f2f81a1df20a4a95d2e8dc8538b33fb4746 (diff)
downloadqpid-python-0bc9a47a7c35f8cf67ef0e92cc53c91e66a6deec.tar.gz
* Changes to make C++ client code use the asynchronous network IO
* Fixed up the test for buffer changes * Removed unused buffer operations git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@571529 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp13
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp170
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h22
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.cpp48
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.h12
-rw-r--r--qpid/cpp/src/qpid/framing/StructHelper.h26
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h22
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp10
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp14
-rw-r--r--qpid/cpp/src/tests/FieldTableTest.cpp20
-rw-r--r--qpid/cpp/src/tests/FramingTest.cpp90
-rw-r--r--qpid/cpp/src/tests/HeaderTest.cpp29
-rw-r--r--qpid/cpp/src/tests/Makefile.am2
-rw-r--r--qpid/cpp/src/tests/MessageTest.cpp14
-rw-r--r--qpid/cpp/src/tests/Uuid.cpp10
15 files changed, 309 insertions, 193 deletions
diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 5a69ff0d65..e6593c30ca 100644
--- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -149,14 +149,17 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
size += i->size() + 1/*shortstr size*/;
}
- Buffer buffer(size + 4/*longstr size*/);
- buffer.putLong(size);
+
+ char* bytes = static_cast<char*>(::alloca(size + 4/*longstr size*/));
+ Buffer wbuffer(bytes, size + 4/*longstr size*/);
+ wbuffer.putLong(size);
for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
- buffer.putShortString(*i);
+ wbuffer.putShortString(*i);
}
- buffer.flip();
+
+ Buffer rbuffer(bytes, size + 4/*longstr size*/);
string data;
- buffer.getLongString(data);
+ rbuffer.getLongString(data);
FieldTable response;
response.setString("xids", data);
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 6e12a9c84f..b1ec580605 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -25,6 +25,12 @@
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+
+#include <boost/bind.hpp>
+
namespace qpid {
namespace client {
@@ -43,9 +49,9 @@ Connector::Connector(
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size)
-{ }
+ aio(0)
+{
+}
Connector::~Connector(){
if (receiver.id()) {
@@ -56,19 +62,28 @@ Connector::~Connector(){
void Connector::connect(const std::string& host, int port){
socket.connect(host, port);
closed = false;
- receiver = Thread(this);
+ poller = Poller::shared_ptr(new Poller);
+ aio = new AsynchIO(socket,
+ boost::bind(&Connector::readbuff, this, _1, _2),
+ boost::bind(&Connector::eof, this, _1),
+ boost::bind(&Connector::eof, this, _1),
+ 0, // closed
+ 0, // nobuffs
+ boost::bind(&Connector::writebuff, this, _1));
}
void Connector::init(){
ProtocolInitiation init(version);
- writeBlock(&init);
+
+ writeDataBlock(init);
+ receiver = Thread(this);
}
// Call with closedLock held
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
- socket.close();
+ poller->shutdown();
closed = true;
return true;
}
@@ -92,28 +107,11 @@ OutputHandler* Connector::getOutputHandler(){
}
void Connector::send(AMQFrame& frame){
- writeBlock(&frame);
- QPID_LOG(trace, "SENT: " << frame);
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
Mutex::ScopedLock l(writeLock);
- data->encode(outbuf);
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- size_t written = 0;
- while(written < available && !closed){
- ssize_t sent = socket.send(data + written, available-written);
- if(sent > 0) {
- lastOut = now();
- written += sent;
- }
- }
+ writeFrameQueue.push(frame);
+ aio->queueWrite();
+
+ QPID_LOG(trace, "SENT: " << frame);
}
void Connector::handleClosed() {
@@ -121,6 +119,10 @@ void Connector::handleClosed() {
shutdownHandler->shutdown();
}
+// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
+// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
+//
+// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
AbsTime t = now();
@@ -166,33 +168,103 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-void Connector::run(){
- try{
- while(!closed){
- ssize_t available = inbuf.available();
- if(available < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- ssize_t received = socket.recv(inbuf.start(), available);
- checkIdle(received);
-
- if(!closed && received > 0){
- inbuf.move(received);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame;
- while(frame.decode(inbuf)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
+
+// Buffer definition
+struct Buff : public AsynchIO::BufferBase {
+ Buff() :
+ AsynchIO::BufferBase(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV: " << frame);
+ input->received(frame);
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+void Connector::writebuff(AsynchIO& aio) {
+ Mutex::ScopedLock l(writeLock);
+
+ if (writeFrameQueue.empty()) {
+ return;
+ }
+
+ do {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ int buffUsed = 0;
+
+ framing::AMQFrame frame = writeFrameQueue.front();
+ int frameSize = frame.size();
+ while (frameSize <= int(out.available())) {
+
+ // Encode output frame
+ frame.encode(out);
+ buffUsed += frameSize;
+
+ writeFrameQueue.pop();
+ if (writeFrameQueue.empty())
+ break;
+ frame = writeFrameQueue.front();
+ frameSize = frame.size();
}
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
+
+ buff->dataCount = buffUsed;
+ aio.queueWrite(buff);
+ } while (!writeFrameQueue.empty());
+}
+
+void Connector::writeDataBlock(const AMQDataBlock& data) {
+ AsynchIO::BufferBase* buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void Connector::eof(AsynchIO&) {
+ handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void Connector::run(){
+ try {
+ Dispatcher d(poller);
+
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff);
}
- }
- } catch (const std::exception& e) {
+
+ aio->start(poller);
+ d.run();
+ aio->queueForDeletion();
+ socket.close();
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}
}
+
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 1577564d57..8aaaea247a 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -34,9 +34,12 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/AsynchIO.h"
-namespace qpid {
+#include <queue>
+namespace qpid {
+
namespace client {
class Connector : public framing::OutputHandler,
@@ -61,24 +64,29 @@ class Connector : public framing::OutputHandler,
framing::InputHandler* input;
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
-
- framing::Buffer inbuf;
- framing::Buffer outbuf;
sys::Mutex writeLock;
+ std::queue<framing::AMQFrame> writeFrameQueue;
+
sys::Thread receiver;
sys::Socket socket;
+ sys::AsynchIO* aio;
+ sys::Poller::shared_ptr poller;
+
void checkIdle(ssize_t status);
- void writeBlock(framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
void setSocketTimeout();
void run();
void handleClosed();
bool closeInternal();
-
+
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
+ void writebuff(qpid::sys::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(qpid::sys::AsynchIO&);
+
friend class Channel;
public:
Connector(framing::ProtocolVersion pVersion,
diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp
index 6c6b2661bd..215102807e 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.cpp
+++ b/qpid/cpp/src/qpid/framing/Buffer.cpp
@@ -22,9 +22,9 @@
#include "FramingContent.h"
#include "FieldTable.h"
-qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), position(0), limit(_size){
- data = new char[size];
-}
+//qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), position(0), limit(_size){
+// data = new char[size];
+//}
qpid::framing::Buffer::Buffer(char* _data, uint32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){
}
@@ -33,23 +33,23 @@ qpid::framing::Buffer::~Buffer(){
if(owner) delete[] data;
}
-void qpid::framing::Buffer::flip(){
- limit = position;
- position = 0;
-}
+//void qpid::framing::Buffer::flip(){
+// limit = position;
+// position = 0;
+//}
-void qpid::framing::Buffer::clear(){
- limit = size;
- position = 0;
-}
+//void qpid::framing::Buffer::clear(){
+// limit = size;
+// position = 0;
+//}
-void qpid::framing::Buffer::compact(){
- uint32_t p = limit - position;
- //copy p chars from position to 0
- memmove(data, data + position, p);
- limit = size;
- position = p;
-}
+//void qpid::framing::Buffer::compact(){
+// uint32_t p = limit - position;
+// //copy p chars from position to 0
+// memmove(data, data + position, p);
+// limit = size;
+// position = p;
+//}
void qpid::framing::Buffer::record(){
r_position = position;
@@ -65,13 +65,13 @@ uint32_t qpid::framing::Buffer::available(){
return limit - position;
}
-char* qpid::framing::Buffer::start(){
- return data + position;
-}
+//char* qpid::framing::Buffer::start(){
+// return data + position;
+//}
-void qpid::framing::Buffer::move(uint32_t bytes){
- position += bytes;
-}
+//void qpid::framing::Buffer::move(uint32_t bytes){
+// position += bytes;
+//}
void qpid::framing::Buffer::putOctet(uint8_t i){
data[position++] = i;
diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h
index 04acb65e91..d1eb58f14e 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.h
+++ b/qpid/cpp/src/qpid/framing/Buffer.h
@@ -41,18 +41,18 @@ class Buffer
public:
- Buffer(uint32_t size);
+ //Buffer(uint32_t size);
Buffer(char* data, uint32_t size);
~Buffer();
- void flip();
- void clear();
- void compact();
+ //void flip();
+ //void clear();
+ //void compact();
void record();
void restore();
uint32_t available();
- char* start();
- void move(uint32_t bytes);
+ //char* start();
+ //void move(uint32_t bytes);
void putOctet(uint8_t i);
void putShort(uint16_t i);
diff --git a/qpid/cpp/src/qpid/framing/StructHelper.h b/qpid/cpp/src/qpid/framing/StructHelper.h
index dc23a30d58..753a593523 100644
--- a/qpid/cpp/src/qpid/framing/StructHelper.h
+++ b/qpid/cpp/src/qpid/framing/StructHelper.h
@@ -24,6 +24,8 @@
#include "qpid/Exception.h"
#include "Buffer.h"
+#include <stdlib.h> // For alloca
+
namespace qpid {
namespace framing {
@@ -33,20 +35,24 @@ public:
template <class T> void encode(const T t, std::string& data) {
uint32_t size = t.size() + 2/*type*/;
- Buffer buffer(size);
- buffer.putShort(T::TYPE);
- t.encode(buffer);
- buffer.flip();
- buffer.getRawData(data, size);
+ char* bytes = static_cast<char*>(::alloca(size));
+ Buffer wbuffer(bytes, size);
+ wbuffer.putShort(T::TYPE);
+ t.encode(wbuffer);
+
+ Buffer rbuffer(bytes, size);
+ rbuffer.getRawData(data, size);
}
template <class T> void decode(T t, std::string& data) {
- Buffer buffer(data.length());
- buffer.putRawData(data);
- buffer.flip();
- uint16_t type = buffer.getShort();
+ char* bytes = static_cast<char*>(::alloca(data.length()));
+ Buffer wbuffer(bytes, data.length());
+ wbuffer.putRawData(data);
+
+ Buffer rbuffer(bytes, data.length());
+ uint16_t type = rbuffer.getShort();
if (type == T::TYPE) {
- t.decode(buffer);
+ t.decode(rbuffer);
} else {
throw Exception("Type code does not match");
}
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index 7cc7995ee2..ea2badf456 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -63,26 +63,24 @@ private:
*/
class AsynchIO : private DispatchHandle {
public:
- struct Buffer {
- typedef boost::function1<void, const Buffer&> RecycleStorage;
-
+ struct BufferBase {
char* const bytes;
const int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
- Buffer(char* const b, const int32_t s) :
+ BufferBase(char* const b, const int32_t s) :
bytes(b),
byteCount(s),
dataStart(0),
dataCount(0)
{}
- virtual ~Buffer()
+ virtual ~BufferBase()
{}
};
- typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+ typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
@@ -96,8 +94,8 @@ private:
ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
- std::deque<Buffer*> bufferQueue;
- std::deque<Buffer*> writeQueue;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
bool queuedClose;
public:
@@ -107,11 +105,11 @@ public:
void queueForDeletion();
void start(Poller::shared_ptr poller);
- void queueReadBuffer(Buffer* buff);
- void queueWrite(Buffer* buff = 0);
- void unread(Buffer* buff);
+ void queueReadBuffer(BufferBase* buff);
+ void queueWrite(BufferBase* buff = 0);
+ void unread(BufferBase* buff);
void queueWriteClose();
- Buffer* getQueuedBuffer();
+ BufferBase* getQueuedBuffer();
const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 6cd43dc4f3..0700fff8af 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -74,9 +74,9 @@ AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
{}
// Buffer definition
-struct Buff : public AsynchIO::Buffer {
+struct Buff : public AsynchIO::BufferBase {
Buff() :
- AsynchIO::Buffer(new char[65536], 65536)
+ AsynchIO::BufferBase(new char[65536], 65536)
{}
~Buff()
{ delete [] bytes;}
@@ -113,7 +113,7 @@ public:
void close();
// Input side
- void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
void eof(AsynchIO& aio);
void disconnect(AsynchIO& aio);
@@ -200,7 +200,7 @@ void AsynchIOHandler::close() {
}
// Input side
-void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
if(initiated){
framing::AMQFrame frame;
@@ -264,7 +264,7 @@ void AsynchIOHandler::idle(AsynchIO&){
do {
// Try and get a queued buffer if not then construct new one
- AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (!buff)
buff = new Buff;
framing::Buffer out(buff->bytes, buff->byteCount);
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 2b462cbd7a..3512363d46 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -121,7 +121,7 @@ void AsynchIO::start(Poller::shared_ptr poller) {
DispatchHandle::startWatch(poller);
}
-void AsynchIO::queueReadBuffer(Buffer* buff) {
+void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
@@ -129,7 +129,7 @@ void AsynchIO::queueReadBuffer(Buffer* buff) {
DispatchHandle::rewatchRead();
}
-void AsynchIO::unread(Buffer* buff) {
+void AsynchIO::unread(BufferBase* buff) {
assert(buff);
if (buff->dataStart != 0) {
memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
@@ -141,7 +141,7 @@ void AsynchIO::unread(Buffer* buff) {
// Either queue for writing or announce that there is something to write
// and we should ask for it
-void AsynchIO::queueWrite(Buffer* buff) {
+void AsynchIO::queueWrite(BufferBase* buff) {
// If no buffer then don't queue anything
// (but still wake up for writing)
if (buff) {
@@ -163,11 +163,11 @@ void AsynchIO::queueWriteClose() {
/** Return a queued buffer if there are enough
* to spare
*/
-AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
// Always keep at least one buffer (it might have data that was "unread" in it)
if (bufferQueue.size()<=1)
return 0;
- Buffer* buff = bufferQueue.back();
+ BufferBase* buff = bufferQueue.back();
buff->dataStart = 0;
buff->dataCount = 0;
bufferQueue.pop_back();
@@ -183,7 +183,7 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer* buff = bufferQueue.front();
+ BufferBase* buff = bufferQueue.front();
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
@@ -239,7 +239,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
// See if we've got something to write
if (!writeQueue.empty()) {
// Write buffer
- Buffer* buff = writeQueue.back();
+ BufferBase* buff = writeQueue.back();
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
diff --git a/qpid/cpp/src/tests/FieldTableTest.cpp b/qpid/cpp/src/tests/FieldTableTest.cpp
index dcab96fb08..deb3655619 100644
--- a/qpid/cpp/src/tests/FieldTableTest.cpp
+++ b/qpid/cpp/src/tests/FieldTableTest.cpp
@@ -39,11 +39,13 @@ class FieldTableTest : public CppUnit::TestCase
ft.setString("A", "BCDE");
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A"));
- Buffer buffer(100);
- buffer.putFieldTable(ft);
- buffer.flip();
+ char buff[100];
+ Buffer wbuffer(buff, 100);
+ wbuffer.putFieldTable(ft);
+
+ Buffer rbuffer(buff, 100);
FieldTable ft2;
- buffer.getFieldTable(ft2);
+ rbuffer.getFieldTable(ft2);
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A"));
}
@@ -68,10 +70,12 @@ class FieldTableTest : public CppUnit::TestCase
FieldTable c;
c = a;
- Buffer buffer(c.size());
- buffer.putFieldTable(c);
- buffer.flip();
- buffer.getFieldTable(d);
+ char* buff = static_cast<char*>(::alloca(c.size()));
+ Buffer wbuffer(buff, c.size());
+ wbuffer.putFieldTable(c);
+
+ Buffer rbuffer(buff, c.size());
+ rbuffer.getFieldTable(d);
CPPUNIT_ASSERT_EQUAL(c, d);
CPPUNIT_ASSERT_EQUAL(std::string("CCCC"), c.getString("A"));
CPPUNIT_ASSERT_EQUAL(1234, c.getInt("B"));
diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp
index 1b843defc1..79df8eade2 100644
--- a/qpid/cpp/src/tests/FramingTest.cpp
+++ b/qpid/cpp/src/tests/FramingTest.cpp
@@ -68,114 +68,130 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE_END();
private:
- Buffer buffer;
+ char buffer[1024];
ProtocolVersion version;
public:
- FramingTest() : buffer(1024), version(highestProtocolVersion) {}
+ FramingTest() : version(highestProtocolVersion) {}
void testBasicQosBody()
{
+ Buffer wbuff(buffer, sizeof(buffer));
BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
BasicQosBody out(version);
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testConnectionSecureBody()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string s = "security credential";
ConnectionSecureBody in(version, s);
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
ConnectionSecureBody out(version);
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testConnectionRedirectBody()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
ConnectionRedirectBody in(version, a, b);
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
ConnectionRedirectBody out(version);
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testAccessRequestBody()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string s = "text";
AccessRequestBody in(version, s, true, false, true, false, true);
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
AccessRequestBody out(version);
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testBasicConsumeBody()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string q = "queue";
std::string t = "tag";
BasicConsumeBody in(version, 0, q, t, false, true, false, false,
FieldTable());
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
BasicConsumeBody out(version);
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testConnectionRedirectBodyFrame()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(999, ConnectionRedirectBody(version, a, b));
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
AMQFrame out;
- out.decode(buffer);
+ out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testBasicConsumeOkBodyFrame()
{
+ Buffer wbuff(buffer, sizeof(buffer));
std::string s = "hostA";
AMQFrame in(999, BasicConsumeOkBody(version, s));
- in.encode(buffer);
- buffer.flip();
+ in.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
AMQFrame out;
- for(int i = 0; i < 5; i++){
- out.decode(buffer);
- CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
- }
+ out.decode(rbuff);
+ CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
void testInlineContent() {
+ Buffer wbuff(buffer, sizeof(buffer));
Content content(INLINE, "MyData");
CPPUNIT_ASSERT(content.isInline());
- content.encode(buffer);
- buffer.flip();
+ content.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
Content recovered;
- recovered.decode(buffer);
+ recovered.decode(rbuff);
CPPUNIT_ASSERT(recovered.isInline());
CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
}
void testContentReference() {
+ Buffer wbuff(buffer, sizeof(buffer));
Content content(REFERENCE, "MyRef");
CPPUNIT_ASSERT(content.isReference());
- content.encode(buffer);
- buffer.flip();
+ content.encode(wbuff);
+
+ Buffer rbuff(buffer, sizeof(buffer));
Content recovered;
- recovered.decode(buffer);
+ recovered.decode(rbuff);
CPPUNIT_ASSERT(recovered.isReference());
CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
}
@@ -198,11 +214,13 @@ class FramingTest : public CppUnit::TestCase
}
try {
- buffer.putOctet(2);
- buffer.putLongString("blah, blah");
- buffer.flip();
+ Buffer wbuff(buffer, sizeof(buffer));
+ wbuff.putOctet(2);
+ wbuff.putLongString("blah, blah");
+
+ Buffer rbuff(buffer, sizeof(buffer));
Content content;
- content.decode(buffer);
+ content.decode(rbuff);
CPPUNIT_ASSERT(false);//fail, expected exception
} catch (QpidError& e) {
CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code);
diff --git a/qpid/cpp/src/tests/HeaderTest.cpp b/qpid/cpp/src/tests/HeaderTest.cpp
index df2230342c..a883ccf300 100644
--- a/qpid/cpp/src/tests/HeaderTest.cpp
+++ b/qpid/cpp/src/tests/HeaderTest.cpp
@@ -38,12 +38,13 @@ public:
{
AMQHeaderBody body;
body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE");
- Buffer buffer(100);
+ char buff[100];
+ Buffer wbuffer(buff, 100);
+ body.encode(wbuffer);
- body.encode(buffer);
- buffer.flip();
+ Buffer rbuffer(buff, 100);
AMQHeaderBody body2;
- body2.decode(buffer, body.size());
+ body2.decode(rbuffer, body.size());
BasicHeaderProperties* props =
body2.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
@@ -84,11 +85,13 @@ public:
properties->setClusterId(clusterId);
properties->setContentLength(contentLength);
- Buffer buffer(10000);
- out.encode(buffer);
- buffer.flip();
+ char buff[10000];
+ Buffer wbuffer(buff, 10000);
+ out.encode(wbuffer);
+
+ Buffer rbuffer(buff, 10000);
AMQFrame in;
- in.decode(buffer);
+ in.decode(rbuffer);
properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
@@ -123,11 +126,13 @@ public:
properties->setExpiration(expiration);
properties->setTimestamp(timestamp);
- Buffer buffer(100);
- body.encode(buffer);
- buffer.flip();
+ char buff[100];
+ Buffer wbuffer(buff, 100);
+ body.encode(wbuffer);
+
+ Buffer rbuffer(buff, 100);
AMQHeaderBody temp;
- temp.decode(buffer, body.size());
+ temp.decode(rbuffer, body.size());
properties = temp.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 545eb965c4..611b498524 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -84,7 +84,6 @@ broker_unit_tests = \
DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \
- MessageBuilderTest \
MessageTest \
QueueRegistryTest \
QueueTest \
@@ -96,6 +95,7 @@ broker_unit_tests = \
TxPublishTest \
ValueTest \
MessageHandlerTest
+# MessageBuilderTest
#client_unit_tests = \
ClientChannelTest
diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp
index 3d080ef3dc..775d251349 100644
--- a/qpid/cpp/src/tests/MessageTest.cpp
+++ b/qpid/cpp/src/tests/MessageTest.cpp
@@ -68,14 +68,14 @@ class MessageTest : public CppUnit::TestCase
dProps->setDeliveryMode(PERSISTENT);
CPPUNIT_ASSERT(msg->isPersistent());
-
- Buffer buffer(msg->encodedSize());
- msg->encode(buffer);
- buffer.flip();
+ char* buff = static_cast<char*>(::alloca(msg->encodedSize()));
+ Buffer wbuffer(buff, msg->encodedSize());
+ msg->encode(wbuffer);
+
+ Buffer rbuffer(buff, msg->encodedSize());
msg.reset(new Message());
- msg->decodeHeader(buffer);
- msg->decodeContent(buffer);
-
+ msg->decodeHeader(rbuffer);
+ msg->decodeContent(rbuffer);
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
diff --git a/qpid/cpp/src/tests/Uuid.cpp b/qpid/cpp/src/tests/Uuid.cpp
index da8c94aeae..db9a012a3d 100644
--- a/qpid/cpp/src/tests/Uuid.cpp
+++ b/qpid/cpp/src/tests/Uuid.cpp
@@ -62,12 +62,14 @@ BOOST_AUTO_TEST_CASE(testUuidOstream) {
}
BOOST_AUTO_TEST_CASE(testUuidEncodeDecode) {
- Buffer buf(Uuid::size());
+ char* buff = static_cast<char*>(::alloca(Uuid::size()));
+ Buffer wbuf(buff, Uuid::size());
Uuid uuid(sample.c_array());
- uuid.encode(buf);
- buf.flip();
+ uuid.encode(wbuf);
+
+ Buffer rbuf(buff, Uuid::size());
Uuid decoded;
- decoded.decode(buf);
+ decoded.decode(rbuf);
BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()),
string(decoded.begin(), decoded.end()));
}