summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/gen/Makefile.am9
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp27
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.h22
-rw-r--r--cpp/lib/client/ClientChannel.cpp16
-rw-r--r--cpp/lib/client/Connector.cpp2
-rw-r--r--cpp/lib/client/Connector.h2
-rw-r--r--cpp/lib/common/Makefile.am3
-rw-r--r--cpp/lib/common/framing/AMQDataBlock.cpp33
-rw-r--r--cpp/lib/common/framing/AMQDataBlock.h4
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp12
-rw-r--r--cpp/lib/common/framing/AMQFrame.h2
-rw-r--r--cpp/lib/common/framing/FrameList.cpp69
-rw-r--r--cpp/lib/common/framing/FrameList.h50
-rw-r--r--cpp/lib/common/framing/OutputHandler.h3
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.cpp6
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.h1
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp8
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h6
-rw-r--r--cpp/tests/ChannelTest.cpp4
-rw-r--r--cpp/tests/InMemoryContentTest.cpp5
-rw-r--r--cpp/tests/LazyLoadedContentTest.cpp4
-rw-r--r--cpp/tests/MessageTest.cpp4
-rw-r--r--cpp/tests/client_test.cpp34
-rwxr-xr-xcpp/tests/run-python-tests2
24 files changed, 284 insertions, 44 deletions
diff --git a/cpp/gen/Makefile.am b/cpp/gen/Makefile.am
index 3398f01330..dc63c8005e 100644
--- a/cpp/gen/Makefile.am
+++ b/cpp/gen/Makefile.am
@@ -32,15 +32,18 @@ DISTCLEANFILES = $(BUILT_SOURCES) timestamp gen-src.mk
#
if CAN_GENERATE_CODE
+
gentools_dir = $(srcdir)/../../gentools
spec_dir = $(srcdir)/../../specs
-spec = $(spec_dir)/amqp.0-8.xml
+spec = $(spec_dir)/amqp.0-9.no-wip.xml
gentools_srcdir = $(gentools_dir)/src/org/apache/qpid/gentools
+gentools_libs = $(gentools_dir)/lib/velocity-1.4.jar:$(gentools_dir)/lib/velocity-dep-1.4.jar
$(BUILT_SOURCES) timestamp: $(spec) $(java_sources) $(cxx_templates)
rm -f $(generated_sources)
- cd $(gentools_srcdir) && rm -f *.class && $(JAVAC) *.java
- $(JAVA) -cp $(gentools_dir)/src org.apache.qpid.gentools.Main \
+ cd $(gentools_srcdir) && rm -f *.class
+ $(JAVAC) -cp $(gentools_libs) -sourcepath $(gentools_srcdir) -d $(gentools_dir)/src $(gentools_srcdir)/*.java
+ $(JAVA) -cp $(gentools_dir)/src:$(gentools_libs) org.apache.qpid.gentools.Main \
-c -o . -t $(gentools_dir)/templ.cpp $(spec)
touch timestamp
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index b23432e29d..d1b1d996a4 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -223,7 +223,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
if (parent->channels[channel] == 0) {
parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax,
parent->queues->getStore(), parent->settings.stagingThreshold);
- parent->client->getChannel().openOk(channel);
+ parent->client->getChannel().openOk(channel, "");
} else {
std::stringstream out;
out << "Channel already open: " << channel;
@@ -337,6 +337,25 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
}
}
+
+void SessionHandlerImpl::QueueHandlerImpl::unbind(u_int16_t channel,
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& exchangeName,
+ const string& routingKey,
+ const FieldTable& arguments)
+{
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+ exchange->unbind(queue, routingKey, &arguments);
+ }else{
+ throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+ }
+
+ parent->client->getQueue().unbindOk(channel);
+}
+
void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
@@ -446,7 +465,11 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64
void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
parent->getChannel(channel)->recover(requeue);
- parent->client->getBasic().recoverOk(channel);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::recoverSync(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+ parent->client->getBasic().recoverSyncOk(channel);
}
void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h
index 7e631b4505..1d81b6503f 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/SessionHandlerImpl.h
@@ -162,7 +162,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
u_int16_t classId, u_int16_t methodId);
virtual void closeOk(u_int16_t channel);
-
+
virtual ~ChannelHandlerImpl(){}
};
@@ -177,7 +177,12 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
-
+
+ virtual void bound( u_int16_t /*channel*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*queue*/ ) {}
+
virtual ~ExchangeHandlerImpl(){}
};
@@ -195,6 +200,13 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
const string& exchange, const string& routingKey, bool nowait,
const qpid::framing::FieldTable& arguments);
+ virtual void unbind(u_int16_t channel,
+ u_int16_t ticket,
+ const string& queue,
+ const string& exchange,
+ const string& routingKey,
+ const framing::FieldTable& arguments);
+
virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
bool nowait);
@@ -230,6 +242,8 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual void recoverSync(u_int16_t channel, bool requeue);
virtual ~BasicHandlerImpl(){}
};
@@ -257,10 +271,6 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
-
- // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
- // however v0.9 will not - kpvdr 2006-11-17
- inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }
};
}
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index a97d79dcf9..92f8ae63ca 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -23,6 +23,7 @@
#include <ClientMessage.h>
#include <QpidError.h>
#include <MethodBodyInstances.h>
+#include <framing/FrameList.h>
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
@@ -219,19 +220,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ std::auto_ptr<FrameList> message(new FrameList());
+
+ message->add(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(version, id, body));
+ message->add(new AMQFrame(version, id, body));
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
- if(data_length < frag_size){
- out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+ if(data_length + message->size() < frag_size){
+ message->add(new AMQFrame(version, id, new AMQContentBody(data)));
+ } else if(data_length < frag_size){
+ out->send(message.release());
+ out->send(new AMQFrame(version, id, new AMQContentBody(data)));
}else{
+ out->send(message.release());
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
@@ -244,6 +251,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
}
}
+ if (message.get()) out->send(message.release());
}
void Channel::commit(){
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index a99360b840..291ebb2107 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -78,7 +78,7 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-void Connector::send(AMQFrame* frame){
+void Connector::send(AMQDataBlock* frame){
writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index 44112369dc..49dcf6bb7a 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -85,7 +85,7 @@ namespace client {
virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void send(qpid::framing::AMQDataBlock* frame);
virtual void setReadTimeout(u_int16_t timeout);
virtual void setWriteTimeout(u_int16_t timeout);
};
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index b2e2de2cf1..b584e93ca5 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -76,6 +76,7 @@ libqpidcommon_la_SOURCES = \
$(platform_src) \
$(framing)/AMQBody.cpp \
$(framing)/AMQContentBody.cpp \
+ $(framing)/AMQDataBlock.cpp \
$(framing)/AMQFrame.cpp \
$(framing)/AMQHeaderBody.cpp \
$(framing)/AMQHeartbeatBody.cpp \
@@ -84,6 +85,7 @@ libqpidcommon_la_SOURCES = \
$(framing)/BodyHandler.cpp \
$(framing)/Buffer.cpp \
$(framing)/FieldTable.cpp \
+ $(framing)/FrameList.cpp \
$(framing)/FramingContent.cpp \
$(framing)/InitiationHandler.cpp \
$(framing)/ProtocolInitiation.cpp \
@@ -114,6 +116,7 @@ nobase_pkginclude_HEADERS = \
$(framing)/BodyHandler.h \
$(framing)/Buffer.h \
$(framing)/FieldTable.h \
+ $(framing)/FrameList.h \
$(framing)/FramingContent.h \
$(framing)/HeaderProperties.h \
$(framing)/InitiationHandler.h \
diff --git a/cpp/lib/common/framing/AMQDataBlock.cpp b/cpp/lib/common/framing/AMQDataBlock.cpp
new file mode 100644
index 0000000000..9c4d6bee63
--- /dev/null
+++ b/cpp/lib/common/framing/AMQDataBlock.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AMQDataBlock.h"
+
+
+namespace qpid {
+namespace framing {
+
+std::ostream& operator<<(std::ostream& out, const AMQDataBlock& b)
+{
+ b.print(out);
+ return out;
+}
+
+}}
diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h
index ac91c52164..36de2beea5 100644
--- a/cpp/lib/common/framing/AMQDataBlock.h
+++ b/cpp/lib/common/framing/AMQDataBlock.h
@@ -33,10 +33,12 @@ public:
virtual void encode(Buffer& buffer) = 0;
virtual bool decode(Buffer& buffer) = 0;
virtual u_int32_t size() const = 0;
+ virtual void print(std::ostream& out) const = 0;
+
+ friend std::ostream& operator<<(std::ostream& out, const AMQDataBlock& block);
};
}
}
-
#endif
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index 6fa5b9ae51..0530dc805c 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -119,14 +119,18 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
body->decode(buffer, bufSize);
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
+void AMQFrame::print(std::ostream& out) const
{
- out << "Frame[channel=" << t.channel << "; ";
- if (t.body.get() == 0)
+ out << "Frame[channel=" << channel << "; ";
+ if (body.get() == 0)
out << "empty";
else
- out << *t.body;
+ out << *body;
out << "]";
+}
+std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
+{
+ t.print(out);
return out;
}
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index d3c769087a..21642f112a 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -61,6 +61,8 @@ namespace qpid {
u_int32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
+ void print(std::ostream& out) const;
+
friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body);
};
diff --git a/cpp/lib/common/framing/FrameList.cpp b/cpp/lib/common/framing/FrameList.cpp
new file mode 100644
index 0000000000..f188347101
--- /dev/null
+++ b/cpp/lib/common/framing/FrameList.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FrameList.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace framing {
+
+FrameList::~FrameList()
+{
+ for (Frames::iterator i = frames.begin(); i != frames.end(); i++) {
+ delete (*i);
+ }
+}
+
+void FrameList::encode(Buffer& buffer)
+{
+ for (Frames::iterator i = frames.begin(); i != frames.end(); i++) {
+ (*i)->encode(buffer);
+ }
+}
+
+bool FrameList::decode(Buffer&)
+{
+ throw Exception("FrameList::decode() not valid!");
+}
+
+u_int32_t FrameList::size() const
+{
+ uint32_t s(0);
+ for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) {
+ s += (*i)->size();
+ }
+ return s;
+}
+
+void FrameList::print(std::ostream& out) const
+{
+ out << "Frames: ";
+ for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) {
+ (*i)->print(out);
+ out << "; ";
+ }
+}
+
+void FrameList::add(AMQFrame* f)
+{
+ frames.push_back(f);
+}
+
+}}
diff --git a/cpp/lib/common/framing/FrameList.h b/cpp/lib/common/framing/FrameList.h
new file mode 100644
index 0000000000..59dc385e46
--- /dev/null
+++ b/cpp/lib/common/framing/FrameList.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Buffer.h"
+#include "AMQDataBlock.h"
+#include "AMQFrame.h"
+
+#include <list>
+
+#ifndef _FrameList_
+#define _FrameList_
+
+namespace qpid {
+namespace framing {
+
+class FrameList : public AMQDataBlock
+{
+ typedef std::list<AMQFrame*> Frames;
+ Frames frames;
+public:
+ virtual ~FrameList();
+ void encode(Buffer& buffer);
+ bool decode(Buffer& buffer);
+ u_int32_t size() const;
+ void add(AMQFrame* f);
+ void print(std::ostream& out) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h
index 2e01e34df2..16dc519738 100644
--- a/cpp/lib/common/framing/OutputHandler.h
+++ b/cpp/lib/common/framing/OutputHandler.h
@@ -22,6 +22,7 @@
*
*/
#include <boost/noncopyable.hpp>
+#include <AMQDataBlock.h>
#include <AMQFrame.h>
namespace qpid {
@@ -30,7 +31,7 @@ namespace framing {
class OutputHandler : private boost::noncopyable {
public:
virtual ~OutputHandler() {}
- virtual void send(AMQFrame* frame) = 0;
+ virtual void send(AMQDataBlock* frame) = 0;
};
}}
diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp
index 471f736a7d..360178df5a 100644
--- a/cpp/lib/common/framing/ProtocolInitiation.cpp
+++ b/cpp/lib/common/framing/ProtocolInitiation.cpp
@@ -19,6 +19,7 @@
*
*/
#include <ProtocolInitiation.h>
+#include <iostream>
qpid::framing::ProtocolInitiation::ProtocolInitiation(){}
@@ -55,4 +56,9 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){
}
}
+void qpid::framing::ProtocolInitiation::print(std::ostream& out) const
+{
+ out << "AMQP(" << getMajor() << "-" << getMinor() << ")";
+}
+
//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date
diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h
index 003c3bba81..03e53c75cb 100644
--- a/cpp/lib/common/framing/ProtocolInitiation.h
+++ b/cpp/lib/common/framing/ProtocolInitiation.h
@@ -45,6 +45,7 @@ public:
inline u_int8_t getMajor() const { return version.getMajor(); }
inline u_int8_t getMinor() const { return version.getMinor(); }
inline const ProtocolVersion& getVersion() const { return version; }
+ void print(std::ostream& out) const;
};
}
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
index 8a7ce18136..dfe27050c4 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.cpp
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -96,7 +96,7 @@ void LFSessionContext::write(){
if(!framesToWrite.empty()){
out.clear();
bool encoded(false);
- AMQFrame* frame = framesToWrite.front();
+ AMQDataBlock* frame = framesToWrite.front();
while(frame && out.available() >= frame->size()){
encoded = true;
frame->encode(out);
@@ -120,7 +120,7 @@ void LFSessionContext::write(){
}
}
-void LFSessionContext::send(AMQFrame* frame){
+void LFSessionContext::send(AMQDataBlock* frame){
Mutex::ScopedLock l(writeLock);
if(!closing){
framesToWrite.push(frame);
@@ -173,9 +173,9 @@ void LFSessionContext::init(SessionHandler* _handler){
processor->add(&fd);
}
-void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+void LFSessionContext::log(const std::string& desc, AMQDataBlock* const block){
Mutex::ScopedLock l(logLock);
- std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+ std::cout << desc << " [" << &socket << "]: " << *block << std::endl;
}
Mutex LFSessionContext::logLock;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
index eeb8279d9a..7862055735 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.h
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -54,7 +54,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
apr_pollfd_t fd;
- std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ std::queue<qpid::framing::AMQDataBlock*> framesToWrite;
qpid::sys::Mutex writeLock;
bool processing;
@@ -62,7 +62,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
static qpid::sys::Mutex logLock;
void log(const std::string& desc,
- qpid::framing::AMQFrame* const frame);
+ qpid::framing::AMQDataBlock* const block);
public:
@@ -70,7 +70,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
LFProcessor* const processor,
bool debug = false);
virtual ~LFSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void send(qpid::framing::AMQDataBlock* frame);
virtual void close();
void read();
void write();
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index cc0a90bad9..637e971077 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -39,8 +39,8 @@ using std::queue;
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
+ virtual void send(AMQDataBlock* block){
+ frames.push_back(dynamic_cast<AMQFrame*>(block));
}
};
diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp
index bd638dae66..ca27e80515 100644
--- a/cpp/tests/InMemoryContentTest.cpp
+++ b/cpp/tests/InMemoryContentTest.cpp
@@ -33,8 +33,9 @@ using namespace qpid::framing;
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
+
+ virtual void send(AMQDataBlock* block){
+ frames.push_back(dynamic_cast<AMQFrame*>(block));
}
};
diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp
index 2075a6dd3a..8d0ad65a6a 100644
--- a/cpp/tests/LazyLoadedContentTest.cpp
+++ b/cpp/tests/LazyLoadedContentTest.cpp
@@ -35,8 +35,8 @@ using namespace qpid::framing;
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
+ virtual void send(AMQDataBlock* block){
+ frames.push_back(dynamic_cast<AMQFrame*>(block));
}
};
diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp
index bcf3ad8064..ee864a3883 100644
--- a/cpp/tests/MessageTest.cpp
+++ b/cpp/tests/MessageTest.cpp
@@ -30,8 +30,8 @@ using namespace qpid::framing;
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
+ virtual void send(AMQDataBlock* block){
+ frames.push_back(dynamic_cast<AMQFrame*>(block));
}
};
diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp
index a5cc64d1e4..55bf6234d1 100644
--- a/cpp/tests/client_test.cpp
+++ b/cpp/tests/client_test.cpp
@@ -35,6 +35,7 @@
#include <MessageListener.h>
#include <sys/Monitor.h>
#include <FieldTable.h>
+#include <cstdlib>
using namespace qpid::client;
using namespace qpid::sys;
@@ -52,12 +53,28 @@ public:
inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
inline virtual void received(Message& msg){
- std::cout << "Received message " << msg.getData() << std::endl;
+ std::cout << "Received message " << msg.getData().substr(0, 5) << "..." << std::endl;
monitor->notify();
}
};
-int main(int argc, char**)
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+std::string generateData(uint size)
+{
+ if (size < chars.length()) {
+ return chars.substr(0, size);
+ }
+ std::string data;
+ for (uint i = 0; i < (size / chars.length()); i++) {
+ data += chars;
+ }
+ data += chars.substr(0, size % chars.length());
+ return data;
+}
+
+
+int main(int argc, char** argv)
{
try{
//Use a custom exchange
@@ -109,10 +126,17 @@ int main(int argc, char**)
//Now we create and publish a message to our exchange with a
//routing key that will cause it to be routed to our queue
Message msg;
- string data("MyMessage");
- msg.setData(data);
+ uint size = 0;
+ if (argc > 1) {
+ size = atoi(argv[1]);
+ }
+ if (size) {
+ msg.setData(generateData(size));
+ } else {
+ msg.setData("MyMessage");
+ }
channel.publish(msg, exchange, "MyTopic");
- std::cout << "Published message: " << data << std::endl;
+ std::cout << "Published message: " << msg.getData().substr(0, 5) << "..." << std::endl;
{
Monitor::ScopedLock l(monitor);
diff --git a/cpp/tests/run-python-tests b/cpp/tests/run-python-tests
index 5148f644eb..e2e45f9ea9 100755
--- a/cpp/tests/run-python-tests
+++ b/cpp/tests/run-python-tests
@@ -41,7 +41,7 @@ sleep 4
# Run the tests.
( cd $abs_srcdir/../../python \
- && python ./run-tests -v -I cpp_failing.txt ) || fail=1
+ && python ./run-tests -v -I cpp_failing.txt -s ../specs/amqp.0-9.no-wip.xml ) || fail=1
kill $pid || { echo FAIL: process already died; cat log; fail=1; }