diff options
Diffstat (limited to 'cpp')
24 files changed, 284 insertions, 44 deletions
diff --git a/cpp/gen/Makefile.am b/cpp/gen/Makefile.am index 3398f01330..dc63c8005e 100644 --- a/cpp/gen/Makefile.am +++ b/cpp/gen/Makefile.am @@ -32,15 +32,18 @@ DISTCLEANFILES = $(BUILT_SOURCES) timestamp gen-src.mk # if CAN_GENERATE_CODE + gentools_dir = $(srcdir)/../../gentools spec_dir = $(srcdir)/../../specs -spec = $(spec_dir)/amqp.0-8.xml +spec = $(spec_dir)/amqp.0-9.no-wip.xml gentools_srcdir = $(gentools_dir)/src/org/apache/qpid/gentools +gentools_libs = $(gentools_dir)/lib/velocity-1.4.jar:$(gentools_dir)/lib/velocity-dep-1.4.jar $(BUILT_SOURCES) timestamp: $(spec) $(java_sources) $(cxx_templates) rm -f $(generated_sources) - cd $(gentools_srcdir) && rm -f *.class && $(JAVAC) *.java - $(JAVA) -cp $(gentools_dir)/src org.apache.qpid.gentools.Main \ + cd $(gentools_srcdir) && rm -f *.class + $(JAVAC) -cp $(gentools_libs) -sourcepath $(gentools_srcdir) -d $(gentools_dir)/src $(gentools_srcdir)/*.java + $(JAVA) -cp $(gentools_dir)/src:$(gentools_libs) org.apache.qpid.gentools.Main \ -c -o . -t $(gentools_dir)/templ.cpp $(spec) touch timestamp diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index b23432e29d..d1b1d996a4 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -223,7 +223,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin if (parent->channels[channel] == 0) { parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); - parent->client->getChannel().openOk(channel); + parent->client->getChannel().openOk(channel, ""); } else { std::stringstream out; out << "Channel already open: " << channel; @@ -337,6 +337,25 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t } } + +void SessionHandlerImpl::QueueHandlerImpl::unbind(u_int16_t channel, + u_int16_t /*ticket*/, + const string& queueName, + const string& exchangeName, + const string& routingKey, + const FieldTable& arguments) +{ + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + if(exchange){ + exchange->unbind(queue, routingKey, &arguments); + }else{ + throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); + } + + parent->client->getQueue().unbindOk(channel); +} + void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); @@ -446,7 +465,11 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); - parent->client->getBasic().recoverOk(channel); +} + +void SessionHandlerImpl::BasicHandlerImpl::recoverSync(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); + parent->client->getBasic().recoverSyncOk(channel); } void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 7e631b4505..1d81b6503f 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -162,7 +162,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, u_int16_t classId, u_int16_t methodId); virtual void closeOk(u_int16_t channel); - + virtual ~ChannelHandlerImpl(){} }; @@ -177,7 +177,12 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); - + + virtual void bound( u_int16_t /*channel*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*queue*/ ) {} + virtual ~ExchangeHandlerImpl(){} }; @@ -195,6 +200,13 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, const string& exchange, const string& routingKey, bool nowait, const qpid::framing::FieldTable& arguments); + virtual void unbind(u_int16_t channel, + u_int16_t ticket, + const string& queue, + const string& exchange, + const string& routingKey, + const framing::FieldTable& arguments); + virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, bool nowait); @@ -230,6 +242,8 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); virtual void recover(u_int16_t channel, bool requeue); + + virtual void recoverSync(u_int16_t channel, bool requeue); virtual ~BasicHandlerImpl(){} }; @@ -257,10 +271,6 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } - - // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test; - // however v0.9 will not - kpvdr 2006-11-17 - inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); } }; } diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a97d79dcf9..92f8ae63ca 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -23,6 +23,7 @@ #include <ClientMessage.h> #include <QpidError.h> #include <MethodBodyInstances.h> +#include <framing/FrameList.h> using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; @@ -219,19 +220,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + std::auto_ptr<FrameList> message(new FrameList()); + + message->add(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); + message->add(new AMQFrame(version, id, body)); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); + if(data_length + message->size() < frag_size){ + message->add(new AMQFrame(version, id, new AMQContentBody(data))); + } else if(data_length < frag_size){ + out->send(message.release()); + out->send(new AMQFrame(version, id, new AMQContentBody(data))); }else{ + out->send(message.release()); u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { @@ -244,6 +251,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } } } + if (message.get()) out->send(message.release()); } void Channel::commit(){ diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index a99360b840..291ebb2107 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -78,7 +78,7 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame* frame){ +void Connector::send(AMQDataBlock* frame){ writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 44112369dc..49dcf6bb7a 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -85,7 +85,7 @@ namespace client { virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void send(qpid::framing::AMQDataBlock* frame); virtual void setReadTimeout(u_int16_t timeout); virtual void setWriteTimeout(u_int16_t timeout); }; diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index b2e2de2cf1..b584e93ca5 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -76,6 +76,7 @@ libqpidcommon_la_SOURCES = \ $(platform_src) \ $(framing)/AMQBody.cpp \ $(framing)/AMQContentBody.cpp \ + $(framing)/AMQDataBlock.cpp \ $(framing)/AMQFrame.cpp \ $(framing)/AMQHeaderBody.cpp \ $(framing)/AMQHeartbeatBody.cpp \ @@ -84,6 +85,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/BodyHandler.cpp \ $(framing)/Buffer.cpp \ $(framing)/FieldTable.cpp \ + $(framing)/FrameList.cpp \ $(framing)/FramingContent.cpp \ $(framing)/InitiationHandler.cpp \ $(framing)/ProtocolInitiation.cpp \ @@ -114,6 +116,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/BodyHandler.h \ $(framing)/Buffer.h \ $(framing)/FieldTable.h \ + $(framing)/FrameList.h \ $(framing)/FramingContent.h \ $(framing)/HeaderProperties.h \ $(framing)/InitiationHandler.h \ diff --git a/cpp/lib/common/framing/AMQDataBlock.cpp b/cpp/lib/common/framing/AMQDataBlock.cpp new file mode 100644 index 0000000000..9c4d6bee63 --- /dev/null +++ b/cpp/lib/common/framing/AMQDataBlock.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AMQDataBlock.h" + + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQDataBlock& b) +{ + b.print(out); + return out; +} + +}} diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h index ac91c52164..36de2beea5 100644 --- a/cpp/lib/common/framing/AMQDataBlock.h +++ b/cpp/lib/common/framing/AMQDataBlock.h @@ -33,10 +33,12 @@ public: virtual void encode(Buffer& buffer) = 0; virtual bool decode(Buffer& buffer) = 0; virtual u_int32_t size() const = 0; + virtual void print(std::ostream& out) const = 0; + + friend std::ostream& operator<<(std::ostream& out, const AMQDataBlock& block); }; } } - #endif diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 6fa5b9ae51..0530dc805c 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -119,14 +119,18 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) body->decode(buffer, bufSize); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +void AMQFrame::print(std::ostream& out) const { - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) + out << "Frame[channel=" << channel << "; "; + if (body.get() == 0) out << "empty"; else - out << *t.body; + out << *body; out << "]"; +} +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +{ + t.print(out); return out; } diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index d3c769087a..21642f112a 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -61,6 +61,8 @@ namespace qpid { u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); + void print(std::ostream& out) const; + friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); }; diff --git a/cpp/lib/common/framing/FrameList.cpp b/cpp/lib/common/framing/FrameList.cpp new file mode 100644 index 0000000000..f188347101 --- /dev/null +++ b/cpp/lib/common/framing/FrameList.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FrameList.h" +#include "Exception.h" + +namespace qpid { +namespace framing { + +FrameList::~FrameList() +{ + for (Frames::iterator i = frames.begin(); i != frames.end(); i++) { + delete (*i); + } +} + +void FrameList::encode(Buffer& buffer) +{ + for (Frames::iterator i = frames.begin(); i != frames.end(); i++) { + (*i)->encode(buffer); + } +} + +bool FrameList::decode(Buffer&) +{ + throw Exception("FrameList::decode() not valid!"); +} + +u_int32_t FrameList::size() const +{ + uint32_t s(0); + for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) { + s += (*i)->size(); + } + return s; +} + +void FrameList::print(std::ostream& out) const +{ + out << "Frames: "; + for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) { + (*i)->print(out); + out << "; "; + } +} + +void FrameList::add(AMQFrame* f) +{ + frames.push_back(f); +} + +}} diff --git a/cpp/lib/common/framing/FrameList.h b/cpp/lib/common/framing/FrameList.h new file mode 100644 index 0000000000..59dc385e46 --- /dev/null +++ b/cpp/lib/common/framing/FrameList.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Buffer.h" +#include "AMQDataBlock.h" +#include "AMQFrame.h" + +#include <list> + +#ifndef _FrameList_ +#define _FrameList_ + +namespace qpid { +namespace framing { + +class FrameList : public AMQDataBlock +{ + typedef std::list<AMQFrame*> Frames; + Frames frames; +public: + virtual ~FrameList(); + void encode(Buffer& buffer); + bool decode(Buffer& buffer); + u_int32_t size() const; + void add(AMQFrame* f); + void print(std::ostream& out) const; +}; + +} +} + + +#endif diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h index 2e01e34df2..16dc519738 100644 --- a/cpp/lib/common/framing/OutputHandler.h +++ b/cpp/lib/common/framing/OutputHandler.h @@ -22,6 +22,7 @@ * */ #include <boost/noncopyable.hpp> +#include <AMQDataBlock.h> #include <AMQFrame.h> namespace qpid { @@ -30,7 +31,7 @@ namespace framing { class OutputHandler : private boost::noncopyable { public: virtual ~OutputHandler() {} - virtual void send(AMQFrame* frame) = 0; + virtual void send(AMQDataBlock* frame) = 0; }; }} diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp index 471f736a7d..360178df5a 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.cpp +++ b/cpp/lib/common/framing/ProtocolInitiation.cpp @@ -19,6 +19,7 @@ * */ #include <ProtocolInitiation.h> +#include <iostream> qpid::framing::ProtocolInitiation::ProtocolInitiation(){} @@ -55,4 +56,9 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ } } +void qpid::framing::ProtocolInitiation::print(std::ostream& out) const +{ + out << "AMQP(" << getMajor() << "-" << getMinor() << ")"; +} + //TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h index 003c3bba81..03e53c75cb 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ b/cpp/lib/common/framing/ProtocolInitiation.h @@ -45,6 +45,7 @@ public: inline u_int8_t getMajor() const { return version.getMajor(); } inline u_int8_t getMinor() const { return version.getMinor(); } inline const ProtocolVersion& getVersion() const { return version; } + void print(std::ostream& out) const; }; } diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 8a7ce18136..dfe27050c4 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -96,7 +96,7 @@ void LFSessionContext::write(){ if(!framesToWrite.empty()){ out.clear(); bool encoded(false); - AMQFrame* frame = framesToWrite.front(); + AMQDataBlock* frame = framesToWrite.front(); while(frame && out.available() >= frame->size()){ encoded = true; frame->encode(out); @@ -120,7 +120,7 @@ void LFSessionContext::write(){ } } -void LFSessionContext::send(AMQFrame* frame){ +void LFSessionContext::send(AMQDataBlock* frame){ Mutex::ScopedLock l(writeLock); if(!closing){ framesToWrite.push(frame); @@ -173,9 +173,9 @@ void LFSessionContext::init(SessionHandler* _handler){ processor->add(&fd); } -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ +void LFSessionContext::log(const std::string& desc, AMQDataBlock* const block){ Mutex::ScopedLock l(logLock); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + std::cout << desc << " [" << &socket << "]: " << *block << std::endl; } Mutex LFSessionContext::logLock; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index eeb8279d9a..7862055735 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -54,7 +54,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext apr_pollfd_t fd; - std::queue<qpid::framing::AMQFrame*> framesToWrite; + std::queue<qpid::framing::AMQDataBlock*> framesToWrite; qpid::sys::Mutex writeLock; bool processing; @@ -62,7 +62,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext static qpid::sys::Mutex logLock; void log(const std::string& desc, - qpid::framing::AMQFrame* const frame); + qpid::framing::AMQDataBlock* const block); public: @@ -70,7 +70,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext LFProcessor* const processor, bool debug = false); virtual ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void send(qpid::framing::AMQDataBlock* frame); virtual void close(); void read(); void write(); diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index cc0a90bad9..637e971077 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -39,8 +39,8 @@ using std::queue; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index bd638dae66..ca27e80515 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -33,8 +33,9 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 2075a6dd3a..8d0ad65a6a 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -35,8 +35,8 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index bcf3ad8064..ee864a3883 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -30,8 +30,8 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp index a5cc64d1e4..55bf6234d1 100644 --- a/cpp/tests/client_test.cpp +++ b/cpp/tests/client_test.cpp @@ -35,6 +35,7 @@ #include <MessageListener.h> #include <sys/Monitor.h> #include <FieldTable.h> +#include <cstdlib> using namespace qpid::client; using namespace qpid::sys; @@ -52,12 +53,28 @@ public: inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} inline virtual void received(Message& msg){ - std::cout << "Received message " << msg.getData() << std::endl; + std::cout << "Received message " << msg.getData().substr(0, 5) << "..." << std::endl; monitor->notify(); } }; -int main(int argc, char**) +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + + +int main(int argc, char** argv) { try{ //Use a custom exchange @@ -109,10 +126,17 @@ int main(int argc, char**) //Now we create and publish a message to our exchange with a //routing key that will cause it to be routed to our queue Message msg; - string data("MyMessage"); - msg.setData(data); + uint size = 0; + if (argc > 1) { + size = atoi(argv[1]); + } + if (size) { + msg.setData(generateData(size)); + } else { + msg.setData("MyMessage"); + } channel.publish(msg, exchange, "MyTopic"); - std::cout << "Published message: " << data << std::endl; + std::cout << "Published message: " << msg.getData().substr(0, 5) << "..." << std::endl; { Monitor::ScopedLock l(monitor); diff --git a/cpp/tests/run-python-tests b/cpp/tests/run-python-tests index 5148f644eb..e2e45f9ea9 100755 --- a/cpp/tests/run-python-tests +++ b/cpp/tests/run-python-tests @@ -41,7 +41,7 @@ sleep 4 # Run the tests. ( cd $abs_srcdir/../../python \ - && python ./run-tests -v -I cpp_failing.txt ) || fail=1 + && python ./run-tests -v -I cpp_failing.txt -s ../specs/amqp.0-9.no-wip.xml ) || fail=1 kill $pid || { echo FAIL: process already died; cat log; fail=1; } |