diff options
author | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
commit | d46ac2955c4871c9f22067f47490095e2c5f1806 (patch) | |
tree | 7e76ef7e4ca47e4cc57c83f7950bf97c3eceb210 /cpp | |
parent | 018723f3889e9a1f63585dddba8eecff1d168501 (diff) | |
download | qpid-python-d46ac2955c4871c9f22067f47490095e2c5f1806.tar.gz |
Merged AMQP version-sensitive generated files with C++ trunk. Phase 1 of merge complete - all locations where version info is required in the framing, broker and client code, the version has been hard-coded to mahor=8, minor=0. Next step: make broker and client version-aware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
31 files changed, 369 insertions, 177 deletions
diff --git a/cpp/Makefile b/cpp/Makefile index d9ffb9919d..dd7551a648 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -22,7 +22,7 @@ include options.mk -.PHONY: test all all-nogen generate unittest pythontest doxygen +.PHONY: test all all-nogen generate unittest pythontest doxygen build-gentools test: unittest pythontest @@ -41,15 +41,25 @@ all: ## Generaged code -SPEC := $(CURDIR)/../specs/amqp-8.0.xml -XSL := code_gen.xsl framing.xsl -STYLESHEETS := $(XSL:%=$(CURDIR)/etc/stylesheets/%) -TRANSFORM := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC) -generate: $(GENDIR)/timestamp -$(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC) - rm -rf $(GENDIR) - mkdir -p $(GENDIR)/qpid/framing - ( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp +# Add all XML specs to be generated onto the following line +SPECS := $(SPEC_DIR)/amqp-8.0.xml # $(SPEC_DIR)/amqp-0.9.test.xml $(SPEC_DIR)/cluster-0.9.test.xml $(SPEC_DIR)/amqp-0.10.test.xml +GENERATE := java -cp $(GENTOOLS_DIR)/src org.apache.qpid.gentools.Main -c -o $(GENDIR)/qpid/framing -t $(GENTOOLS_DIR)/templ.cpp $(SPECS) +generate: build-gentools $(GENDIR)/timestamp + +$(GENDIR)/timestamp: $(wildcard) $(SPECS) + @echo "---------- Generating code from $(SPECS) ----------" + @rm -rf $(GENDIR) + @mkdir -p $(GENDIR)/qpid/framing + @$(GENERATE) + @touch $(GENDIR)/timestamp + @echo "---------- Code generation complete ----------" + +#Build the code generator +build-gentools: $(GENTOOLS_DIR)/src/org/apache/qpid/gentools/Main.class + +$(GENTOOLS_DIR)/src/org/apache/qpid/gentools/Main.class: + @echo "Gentools not built; building..." + @( cd $(GENTOOLS_DIR) && ./build ) # Dependencies for existing generated files. GENFILES:=$(wildcard $(GENDIR)/qpid/*/*.cpp $(GENDIR)/qpid/*/*.h) @@ -147,4 +157,5 @@ clean: # Clean all builds spotless: rm -rf build + -rm $(GENTOOLS_DIR)/src/org/apache/qpid/gentools/*.class diff --git a/cpp/options.mk b/cpp/options.mk index 9fd966ad6a..ac07467916 100644 --- a/cpp/options.mk +++ b/cpp/options.mk @@ -49,12 +49,15 @@ endif ## Build directories. BUILD :=$(PLATFORM)-$(BUILD) -GENDIR:=build/gen BINDIR:=build/$(BUILD)/bin LIBDIR:=build/$(BUILD)/lib OBJDIR:=build/$(BUILD)/obj TESTDIR:=build/$(BUILD)/test +GENDIR:=build/gen +GENTOOLS_DIR:= ../gentools +SPEC_DIR:=../specs + BUILDDIRS := $(BINDIR) $(LIBDIR) $(OBJDIR) $(TESTDIR) $(GENDIR) SRCDIRS := src $(GENDIR) diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp index f3624b4f3d..2894e294e0 100644 --- a/cpp/src/qpid/broker/Channel.cpp +++ b/cpp/src/qpid/broker/Channel.cpp @@ -54,8 +54,7 @@ bool Channel::exists(const string& consumerTag){ } void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ - if(tag.empty()) tag = tagGenerator.generate(); - + if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception @@ -109,7 +108,7 @@ void Channel::rollback(){ accumulatedAck.clear(); } -void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ Mutex::ScopedLock locker(deliveryLock); u_int64_t deliveryTag = currentDeliveryTag++; @@ -129,7 +128,7 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack) : parent(_parent), tag(_tag), diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index 6ed867284c..42f65f2c7c 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -57,13 +57,13 @@ namespace qpid { class Channel : private MessageBuilder::CompletionHandler{ class ConsumerImpl : public virtual Consumer{ Channel* parent; - string tag; + const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; const bool ackExpected; bool blocked; public: - ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); void requestDispatch(); @@ -90,7 +90,7 @@ namespace qpid { Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to virtual void complete(Message::shared_ptr& msg); - void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); + void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); void cancel(consumer_iterator consumer); bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index fe6117ce18..2713fb9482 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -30,7 +30,7 @@ DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { } -void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ +void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); @@ -40,7 +40,7 @@ void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie } } -void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ +void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); @@ -53,7 +53,7 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F } } -void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ +void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); int count(0); diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 0904facaef..09909a8383 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -42,11 +42,11 @@ namespace broker { virtual std::string getType(){ return typeName; } - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual ~DirectExchange(); }; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 2cca942290..d66c4a9e0d 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -39,9 +39,9 @@ namespace qpid { virtual ~Exchange(){} string getName() { return name; } virtual string getType() = 0; - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; }; } } diff --git a/cpp/src/qpid/broker/ExchangeBinding.cpp b/cpp/src/qpid/broker/ExchangeBinding.cpp index 38c60cad0f..375fbe165e 100644 --- a/cpp/src/qpid/broker/ExchangeBinding.cpp +++ b/cpp/src/qpid/broker/ExchangeBinding.cpp @@ -24,7 +24,7 @@ using namespace qpid::broker; using namespace qpid::framing; -ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} +ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} void ExchangeBinding::cancel(){ e->unbind(q, key, args); diff --git a/cpp/src/qpid/broker/ExchangeBinding.h b/cpp/src/qpid/broker/ExchangeBinding.h index 1133d14e51..fd1a05391a 100644 --- a/cpp/src/qpid/broker/ExchangeBinding.h +++ b/cpp/src/qpid/broker/ExchangeBinding.h @@ -34,9 +34,9 @@ namespace qpid { Exchange* e; Queue::shared_ptr q; const string key; - qpid::framing::FieldTable* args; + const qpid::framing::FieldTable* args; public: - ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args); + ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args); virtual void cancel(); virtual ~ExchangeBinding(); }; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index de0310495d..0f3223d3a6 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -28,7 +28,7 @@ using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} -void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ +void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ Mutex::ScopedLock locker(lock); // Add if not already present. Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); @@ -38,7 +38,7 @@ void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie } } -void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){ +void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ Mutex::ScopedLock locker(lock); Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { @@ -47,7 +47,7 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ Mutex::ScopedLock locker(lock); for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ msg.deliverTo(*i); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 671f93819d..0e309a0e79 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -43,11 +43,11 @@ class FanOutExchange : public virtual Exchange { virtual std::string getType(){ return typeName; } - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual ~FanOutExchange(); }; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 18676691e6..bf1a89c7e8 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -43,7 +43,7 @@ namespace { HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } -void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ +void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ Mutex::ScopedLock locker(lock); std::string what = args->getString("x-match"); if (what != all && what != any) { @@ -53,7 +53,7 @@ void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fi queue->bound(new ExchangeBinding(this, queue, routingKey, args)); } -void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){ +void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ Mutex::ScopedLock locker(lock); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); @@ -61,7 +61,7 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){ +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ Mutex::ScopedLock locker(lock);; for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (match(i->first, *args)) msg.deliverTo(i->second); diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 8669329ba0..5d1a51747f 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -46,11 +46,11 @@ class HeadersExchange : public virtual Exchange { virtual std::string getType(){ return typeName; } - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); virtual ~HeadersExchange(); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d398fd7e31..f71324f3fa 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -20,6 +20,10 @@ */ #include <qpid/broker/Message.h> #include <iostream> +// AMQP version change - kpvdr 2006-11-17 +#include <qpid/framing/ProtocolVersion.h> +#include <qpid/framing/BasicDeliverBody.h> +#include <qpid/framing/BasicGetOkBody.h> using namespace boost; using namespace qpid::broker; @@ -76,7 +80,10 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ - out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version + out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); +// out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -86,7 +93,9 @@ void Message::sendGetOk(OutputHandler* out, u_int64_t deliveryTag, u_int32_t framesize){ - out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version + out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f6b9e19209..3d0a0d358b 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -27,7 +27,6 @@ #include <qpid/framing/AMQContentBody.h> #include <qpid/framing/AMQHeaderBody.h> #include <qpid/framing/BasicHeaderProperties.h> -#include <qpid/framing/BasicPublishBody.h> #include <qpid/framing/OutputHandler.h> namespace qpid { diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index bbb5d22c8d..c8c7b440aa 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -37,7 +37,9 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, AutoDelete* _cleaner, const u_int32_t _timeout) : context(_context), - client(context), +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + client(context, 8, 0), queues(_queues), exchanges(_exchanges), cleaner(_cleaner), @@ -165,26 +167,26 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void SessionHandlerImpl::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/, - string& /*response*/, string& /*locale*/){ + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){} +void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ parent->framemax = framemax; parent->heartbeat = heartbeat; } -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){ +void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; parent->client.getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/, + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { parent->client.getConnection().closeOk(0); @@ -197,7 +199,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){ +void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); parent->client.getChannel().openOk(channel); } @@ -205,7 +207,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*o void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/, +void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ Channel* c = parent->getChannel(channel); if(c){ @@ -220,9 +222,9 @@ void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type, +void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - FieldTable& /*arguments*/){ + const FieldTable& /*arguments*/){ if(passive){ if(!parent->exchanges->get(exchange)){ @@ -244,17 +246,17 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 parent->client.getExchange().declareOk(channel); } } - + void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, bool /*ifUnused*/, bool nowait){ + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused parent->exchanges->destroy(exchange); if(!nowait) parent->client.getExchange().deleteOk(channel); } -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name, +void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, FieldTable& /*arguments*/){ + bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = parent->getQueue(name, channel); @@ -282,34 +284,37 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t throw ChannelException(405, "Cannot grant exclusive access to queue"); } if (!nowait) { - name = queue->getName(); - parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); + string queueName = queue->getName(); + parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, - string& exchangeName, string& routingKey, bool nowait, - FieldTable& arguments){ +void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); if(exchange){ - if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - exchange->bind(queue, routingKey, &arguments); +// kpvdr - cannot use this any longer as routingKey is now const +// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); +// exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client.getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } } -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){ +void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); if(!nowait) parent->client.getQueue().purgeOk(channel, count); } -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue, +void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -342,7 +347,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, - string& queueName, string& consumerTag, + const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait){ @@ -353,8 +358,9 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } try{ - channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); + string newTag = consumerTag; + channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0); + if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -365,13 +371,13 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ +void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchangeName, string& routingKey, + const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); @@ -383,7 +389,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t } } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = parent->getQueue(queueName, channelId); if(!parent->getChannel(channelId)->get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h index cec7d8ada4..2df4b10f9b 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerImpl.h @@ -117,18 +117,22 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism, - string& response, string& locale); + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, + const string& response, const string& locale); - virtual void secureOk(u_int16_t channel, string& response); + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void secureOk(u_int16_t channel, const string& response); virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist); + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); - virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, u_int16_t methodId); - + virtual void closeOk(u_int16_t channel); virtual ~ConnectionHandlerImpl(){} @@ -139,13 +143,15 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual void open(u_int16_t channel, string& outOfBand); + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& outOfBand); virtual void flow(u_int16_t channel, bool active); virtual void flowOk(u_int16_t channel, bool active); - virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, u_int16_t methodId); virtual void closeOk(u_int16_t channel); @@ -158,11 +164,13 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, bool passive, bool durable, bool autoDelete, bool internal, bool nowait, - qpid::framing::FieldTable& arguments); + const qpid::framing::FieldTable& arguments); - virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait); + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); virtual ~ExchangeHandlerImpl(){} }; @@ -173,20 +181,24 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue, + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments); + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue, - string& exchange, string& routingKey, bool nowait, - qpid::framing::FieldTable& arguments); + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, + const string& exchange, const string& routingKey, bool nowait, + const qpid::framing::FieldTable& arguments); - virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, bool nowait); - virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, bool nowait); - + virtual ~QueueHandlerImpl(){} }; @@ -197,15 +209,19 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); - virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); - virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait); - - virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey, + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck); + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); @@ -238,7 +254,11 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } - inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } + inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } + + // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test; + // however v0.9 will not - kpvdr 2006-11-17 + inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); } }; } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 2ff4a42070..34fb25781e 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -117,14 +117,14 @@ bool TopicPattern::match(const Tokens& target) const TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } -void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ +void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ Monitor::ScopedLock l(lock); TopicPattern routingPattern(routingKey); bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); } -void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ +void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ Monitor::ScopedLock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); @@ -136,7 +136,7 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi } -void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ Monitor::ScopedLock l(lock); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 96f0283fdd..05fe871114 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -83,11 +83,11 @@ class TopicExchange : public virtual Exchange{ virtual std::string getType(){ return typeName; } - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); virtual ~TopicExchange(); }; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index a6c6bfea51..6901407072 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -22,6 +22,7 @@ #include <qpid/sys/Monitor.h> #include <qpid/client/Message.h> #include <qpid/QpidError.h> +#include <qpid/client/MethodBodyInstances.h> using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; @@ -35,7 +36,10 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) : incoming(0), closed(true), prefetch(_prefetch), - transactional(_transactional) + transactional(_transactional), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) { } Channel::~Channel(){ @@ -50,9 +54,11 @@ void Channel::setPrefetch(u_int16_t _prefetch){ } void Channel::setQos(){ - sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); + sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok); } } @@ -60,9 +66,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); if(synch){ - sendAndReceive(frame, exchange_declare_ok); + sendAndReceive(frame, method_bodies.exchange_declare_ok); }else{ out->send(frame); } @@ -70,9 +76,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch)); if(synch){ - sendAndReceive(frame, exchange_delete_ok); + sendAndReceive(frame, method_bodies.exchange_delete_ok); }else{ out->send(frame); } @@ -81,11 +87,11 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, + AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, queue.isExclusive(), queue.isAutoDelete(), !synch, args)); if(synch){ - sendAndReceive(frame, queue_declare_ok); + sendAndReceive(frame, method_bodies.queue_declare_ok); if(queue.getName().length() == 0){ QueueDeclareOkBody::shared_ptr response = dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); @@ -99,9 +105,9 @@ void Channel::declareQueue(Queue& queue, bool synch){ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); + AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); if(synch){ - sendAndReceive(frame, queue_delete_ok); + sendAndReceive(frame, method_bodies.queue_delete_ok); }else{ out->send(frame); } @@ -110,9 +116,9 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, args)); + AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args)); if(synch){ - sendAndReceive(frame, queue_bind_ok); + sendAndReceive(frame, method_bodies.queue_bind_ok); }else{ out->send(frame); } @@ -122,9 +128,9 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch){ string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); + AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); if(synch){ - sendAndReceive(frame, basic_consume_ok); + sendAndReceive(frame, method_bodies.basic_consume_ok); BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); tag = response->getConsumerTag(); }else{ @@ -140,12 +146,12 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, void Channel::cancel(std::string& tag, bool synch){ Consumer* c = consumers[tag]; if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); + AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch)); if(synch){ - sendAndReceive(frame, basic_cancel_ok); + sendAndReceive(frame, method_bodies.basic_cancel_ok); }else{ out->send(frame); } @@ -181,12 +187,12 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); + AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode)); responses.expect(); out->send(frame); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); - if(basic_get_ok.match(response.get())){ + if(method_bodies.basic_get_ok.match(response.get())){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); @@ -195,7 +201,7 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ } retrieve(msg); return true; - }if(basic_get_empty.match(response.get())){ + }if(method_bodies.basic_get_empty.match(response.get())){ return false; }else{ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); @@ -207,7 +213,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); + out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -233,38 +239,38 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); - sendAndReceive(frame, tx_commit_ok); + AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version)); + sendAndReceive(frame, method_bodies.tx_commit_ok); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); - sendAndReceive(frame, tx_rollback_ok); + AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version)); + sendAndReceive(frame, method_bodies.tx_rollback_ok); } void Channel::handleMethod(AMQMethodBody::shared_ptr body){ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(basic_deliver.match(body.get())){ + }else if(method_bodies.basic_deliver.match(body.get())){ if(incoming != 0){ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); } - }else if(basic_return.match(body.get())){ + }else if(method_bodies.basic_return.match(body.get())){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); } - }else if(channel_close.match(body.get())){ + }else if(method_bodies.channel_close.match(body.get())){ con->removeChannel(this); //need to signal application that channel has been closed through exception - }else if(channel_flow.match(body.get())){ + }else if(method_bodies.channel_flow.match(body.get())){ }else{ //signal error diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index b2e08f5756..e850c1c626 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -65,6 +65,7 @@ namespace client { u_int16_t prefetch; const bool transactional; + qpid::framing::ProtocolVersion version; void enqueue(); void retrieve(Message& msg); diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 93f170742a..de324fdab4 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -23,6 +23,7 @@ #include <qpid/client/Message.h> #include <qpid/QpidError.h> #include <iostream> +#include <qpid/client/MethodBodyInstances.h> using namespace qpid::client; using namespace qpid::framing; @@ -31,7 +32,11 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) +{ connector = new Connector(debug, _max_frame_size); } @@ -51,14 +56,14 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui ProtocolInitiation* header = new ProtocolInitiation(8, 0); responses.expect(); connector->init(header); - responses.receive(connection_start); + responses.receive(method_bodies.connection_start); FieldTable props; string mechanism("PLAIN"); string response = ((char)0) + uid + ((char)0) + pwd; string locale("en_US"); responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); /** * Assume for now that further challenges will not be required @@ -68,10 +73,10 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - responses.receive(connection_tune); + responses.receive(method_bodies.connection_tune); ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); u_int16_t heartbeat = proposal->getHeartbeat(); connector->setReadTimeout(heartbeat * 2); @@ -81,12 +86,12 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui string capabilities; string vhost = virtualhost; responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). responses.waitForResponse(); - if(responses.validate(connection_open_ok)){ + if(responses.validate(method_bodies.connection_open_ok)){ //ok - }else if(responses.validate(connection_redirect)){ + }else if(responses.validate(method_bodies.connection_redirect)){ //ignore for now ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); std::cout << "Received redirection to " << redirect->getHost() << std::endl; @@ -103,7 +108,7 @@ void Connection::close(){ u_int16_t classId(0); u_int16_t methodId(0); - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); } } @@ -115,7 +120,7 @@ void Connection::openChannel(Channel* channel){ channels[channel->id] = channel; //now send frame to open channel and wait for response string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); channel->setQos(); channel->closed = false; } @@ -133,7 +138,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_ //send frame to close channel channel->cancelAll(); channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); channel->con = 0; channel->out = 0; removeChannel(channel); @@ -171,7 +176,7 @@ void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(connection_close.match(body.get())){ + }else if(method_bodies.connection_close.match(body.get())){ //send back close ok //close socket ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); @@ -206,7 +211,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){ std::cout << " [" << methodid << ":" << classid << "]"; } std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); connector->close(); } diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 340ebe9a0f..c7b1fb8dd0 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -59,6 +59,7 @@ class Connection : public virtual qpid::framing::InputHandler, qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; + qpid::framing::ProtocolVersion version; void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h new file mode 100644 index 0000000000..a2bd9dadd9 --- /dev/null +++ b/cpp/src/qpid/client/MethodBodyInstances.h @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/framing/amqp_framing.h> + +/** + * This file replaces the auto-generated instances in the former + * amqp_methods.h file. Add additional instances as needed. + */ + +#ifndef _MethodBodyInstances_h_ +#define _MethodBodyInstances_h_ + +namespace qpid { +namespace client { + +class MethodBodyInstances +{ +private: + qpid::framing::ProtocolVersion version; +public: + const qpid::framing::BasicCancelOkBody basic_cancel_ok; + const qpid::framing::BasicConsumeOkBody basic_consume_ok; + const qpid::framing::BasicDeliverBody basic_deliver; + const qpid::framing::BasicGetEmptyBody basic_get_empty; + const qpid::framing::BasicGetOkBody basic_get_ok; + const qpid::framing::BasicQosBody basic_qos_ok; + const qpid::framing::BasicReturnBody basic_return; + const qpid::framing::ChannelCloseBody channel_close; + const qpid::framing::ChannelCloseOkBody channel_close_ok; + const qpid::framing::ChannelFlowBody channel_flow; + const qpid::framing::ChannelOpenOkBody channel_open_ok; + const qpid::framing::ConnectionCloseBody connection_close; + const qpid::framing::ConnectionCloseOkBody connection_close_ok; + const qpid::framing::ConnectionOpenOkBody connection_open_ok; + const qpid::framing::ConnectionRedirectBody connection_redirect; + const qpid::framing::ConnectionStartBody connection_start; + const qpid::framing::ConnectionTuneBody connection_tune; + const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; + const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; + const qpid::framing::QueueDeclareOkBody queue_declare_ok; + const qpid::framing::QueueDeleteOkBody queue_delete_ok; + const qpid::framing::QueueBindOkBody queue_bind_ok; + const qpid::framing::TxCommitOkBody tx_commit_ok; + const qpid::framing::TxRollbackOkBody tx_rollback_ok; + const qpid::framing::TxSelectOkBody tx_select_ok; + + MethodBodyInstances(u_int8_t major, u_int8_t minor) : + version(major, minor), + basic_cancel_ok(version), + basic_consume_ok(version), + basic_deliver(version), + basic_get_empty(version), + basic_get_ok(version), + basic_qos_ok(version), + basic_return(version), + channel_close(version), + channel_close_ok(version), + channel_flow(version), + channel_open_ok(version), + connection_close(version), + connection_close_ok(version), + connection_open_ok(version), + connection_redirect(version), + connection_start(version), + connection_tune(version), + exchange_declare_ok(version), + exchange_delete_ok(version), + queue_declare_ok(version), + queue_delete_ok(version), + queue_bind_ok(version), + tx_commit_ok(version), + tx_rollback_ok(version), + tx_select_ok(version) + {} + +}; + +static MethodBodyInstances method_bodies(8, 0); + +} // namespace client +} // namespace qpid + +#endif diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index ba6f4fe2de..b58bc93545 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -24,14 +24,23 @@ using namespace qpid::framing; -AMQFrame::AMQFrame(){} +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version +AMQFrame::AMQFrame() : versionMap(8, 0) {} -AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : channel(_channel), body(_body){} +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : +channel(_channel), body(_body), versionMap(8, 0) +{} -AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : channel(_channel), body(_body){} +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : +channel(_channel), body(_body), versionMap(8, 0) +{} -AMQFrame::~AMQFrame(){ -} +AMQFrame::~AMQFrame() {} u_int16_t AMQFrame::getChannel(){ return channel; @@ -50,10 +59,14 @@ void AMQFrame::encode(Buffer& buffer) buffer.putOctet(0xCE); } -AMQBody::shared_ptr createMethodBody(Buffer& buffer){ +AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){ u_int16_t classId = buffer.getShort(); u_int16_t methodId = buffer.getShort(); - AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId)); + // AMQP version management change - kpvdr 2006-11-16 + // TODO: Make this class version-aware and link these hard-wired numbers to that version + AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, 8, 0)); + // Origianl stmt: + // AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId)); return body; } @@ -108,10 +121,13 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) body->decode(buffer, bufSize); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){ +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +{ out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) out << "empty"; - else out << *t.body; + if (t.body.get() == 0) + out << "empty"; + else + out << *t.body; out << "]"; return out; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index bb1ecfac36..29ee1250e1 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -18,7 +18,7 @@ * under the License. * */ -#include <qpid/framing/amqp_methods.h> +/*#include <qpid/framing/amqp_methods.h>*/ #include <qpid/framing/amqp_types.h> #include <qpid/framing/AMQBody.h> #include <qpid/framing/AMQDataBlock.h> @@ -26,6 +26,7 @@ #include <qpid/framing/AMQHeaderBody.h> #include <qpid/framing/AMQContentBody.h> #include <qpid/framing/AMQHeartbeatBody.h> +#include <qpid/framing/AMQP_MethodVersionMap.h> #include <qpid/framing/Buffer.h> #ifndef _AMQFrame_ @@ -39,7 +40,9 @@ namespace qpid { u_int16_t channel; u_int8_t type;//used if the body is decoded separately from the 'head' AMQBody::shared_ptr body; - + AMQP_MethodVersionMap versionMap; + AMQBody::shared_ptr createMethodBody(Buffer& buffer); + public: AMQFrame(); AMQFrame(u_int16_t channel, AMQBody* body); diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index 3f7a668e57..e6e592761e 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -35,9 +35,12 @@ class AMQMethodBody : virtual public AMQBody public: typedef boost::shared_ptr<AMQMethodBody> shared_ptr; + ProtocolVersion version; inline u_int8_t type() const { return METHOD_BODY; } inline u_int32_t size() const { return 4 + bodySize(); } - inline virtual ~AMQMethodBody(){} + inline AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {} + inline AMQMethodBody(ProtocolVersion version) : version(version) {} + inline virtual ~AMQMethodBody() {} virtual void print(std::ostream& out) const = 0; virtual u_int16_t amqpMethodId() const = 0; virtual u_int16_t amqpClassId() const = 0; diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index 3f5f1a10af..6dbfd55a5c 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -19,8 +19,8 @@ * */ #include <qpid/framing/amqp_types.h> -#include <qpid/framing/amqp_methods.h> #include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> #include <qpid/framing/HeaderProperties.h> #ifndef _BasicHeaderProperties_ diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index e266293003..6714eddf07 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -26,9 +26,11 @@ #include <qpid/framing/AMQHeaderBody.h> #include <qpid/framing/AMQContentBody.h> #include <qpid/framing/AMQHeartbeatBody.h> -#include <qpid/framing/amqp_methods.h> +#include <qpid/framing/AMQP_MethodVersionMap.h> #include <qpid/framing/InputHandler.h> #include <qpid/framing/OutputHandler.h> #include <qpid/framing/InitiationHandler.h> #include <qpid/framing/ProtocolInitiation.h> #include <qpid/framing/BasicHeaderProperties.h> +#include <qpid/framing/ProtocolVersion.h> +#include <qpid/framing/ProtocolVersionException.h> diff --git a/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp index f73e80b36c..2758492050 100644 --- a/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp +++ b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp @@ -63,12 +63,15 @@ private: CPPUNIT_ASSERT_EQUAL(heartbeat, body.get()); } }; + ProtocolVersion v; public: + + BodyHandlerTest() : v(8, 0) {} void testMethod() { - AMQMethodBody* method = new QueueDeclareBody(); + AMQMethodBody* method = new QueueDeclareBody(v); AMQFrame frame(0, method); TestBodyHandler handler(method); handler.handleBody(frame.getBody()); diff --git a/cpp/test/unit/qpid/framing/FramingTest.cpp b/cpp/test/unit/qpid/framing/FramingTest.cpp index 3aa0901503..aa8a9a10de 100644 --- a/cpp/test/unit/qpid/framing/FramingTest.cpp +++ b/cpp/test/unit/qpid/framing/FramingTest.cpp @@ -19,6 +19,7 @@ * */ #include <qpid/framing/ConnectionRedirectBody.h> +#include <qpid/framing/ProtocolVersion.h> #include <qpid/framing/amqp_framing.h> #include <iostream> #include <qpid_test_plugin.h> @@ -49,17 +50,20 @@ class FramingTest : public CppUnit::TestCase private: Buffer buffer; + ProtocolVersion v; public: - FramingTest() : buffer(100) {} +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + FramingTest() : buffer(100), v(8, 0) {} void testBasicQosBody() { - BasicQosBody in(0xCAFEBABE, 0xABBA, true); + BasicQosBody in(v, 0xCAFEBABE, 0xABBA, true); in.encodeContent(buffer); buffer.flip(); - BasicQosBody out; + BasicQosBody out(v); out.decodeContent(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -67,10 +71,10 @@ class FramingTest : public CppUnit::TestCase void testConnectionSecureBody() { std::string s = "security credential"; - ConnectionSecureBody in(s); + ConnectionSecureBody in(v, s); in.encodeContent(buffer); buffer.flip(); - ConnectionSecureBody out; + ConnectionSecureBody out(v); out.decodeContent(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -79,10 +83,10 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(a, b); + ConnectionRedirectBody in(v, a, b); in.encodeContent(buffer); buffer.flip(); - ConnectionRedirectBody out; + ConnectionRedirectBody out(v); out.decodeContent(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -90,10 +94,10 @@ class FramingTest : public CppUnit::TestCase void testAccessRequestBody() { std::string s = "text"; - AccessRequestBody in(s, true, false, true, false, true); + AccessRequestBody in(v, s, true, false, true, false, true); in.encodeContent(buffer); buffer.flip(); - AccessRequestBody out; + AccessRequestBody out(v); out.decodeContent(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -102,10 +106,10 @@ class FramingTest : public CppUnit::TestCase { std::string q = "queue"; std::string t = "tag"; - BasicConsumeBody in(0, q, t, false, true, false, false); + BasicConsumeBody in(v, 0, q, t, false, true, false, false); in.encodeContent(buffer); buffer.flip(); - BasicConsumeBody out; + BasicConsumeBody out(v); out.decodeContent(buffer); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -115,7 +119,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(999, new ConnectionRedirectBody(a, b)); + AMQFrame in(999, new ConnectionRedirectBody(v, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -126,7 +130,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(999, new BasicConsumeOkBody(s)); + AMQFrame in(999, new BasicConsumeOkBody(v, s)); in.encode(buffer); buffer.flip(); AMQFrame out; |