summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2006-11-22 16:57:35 +0000
committerKim van der Riet <kpvdr@apache.org>2006-11-22 16:57:35 +0000
commitd46ac2955c4871c9f22067f47490095e2c5f1806 (patch)
tree7e76ef7e4ca47e4cc57c83f7950bf97c3eceb210 /cpp
parent018723f3889e9a1f63585dddba8eecff1d168501 (diff)
downloadqpid-python-d46ac2955c4871c9f22067f47490095e2c5f1806.tar.gz
Merged AMQP version-sensitive generated files with C++ trunk. Phase 1 of merge complete - all locations where version info is required in the framing, broker and client code, the version has been hard-coded to mahor=8, minor=0. Next step: make broker and client version-aware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/Makefile31
-rw-r--r--cpp/options.mk5
-rw-r--r--cpp/src/qpid/broker/Channel.cpp7
-rw-r--r--cpp/src/qpid/broker/Channel.h6
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h6
-rw-r--r--cpp/src/qpid/broker/Exchange.h6
-rw-r--r--cpp/src/qpid/broker/ExchangeBinding.cpp2
-rw-r--r--cpp/src/qpid/broker/ExchangeBinding.h4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h6
-rw-r--r--cpp/src/qpid/broker/Message.cpp13
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp64
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.h70
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h6
-rw-r--r--cpp/src/qpid/client/Channel.cpp66
-rw-r--r--cpp/src/qpid/client/Channel.h1
-rw-r--r--cpp/src/qpid/client/Connection.cpp31
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/MethodBodyInstances.h101
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp36
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h7
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h5
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h2
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h4
-rw-r--r--cpp/test/unit/qpid/framing/BodyHandlerTest.cpp5
-rw-r--r--cpp/test/unit/qpid/framing/FramingTest.cpp30
31 files changed, 369 insertions, 177 deletions
diff --git a/cpp/Makefile b/cpp/Makefile
index d9ffb9919d..dd7551a648 100644
--- a/cpp/Makefile
+++ b/cpp/Makefile
@@ -22,7 +22,7 @@
include options.mk
-.PHONY: test all all-nogen generate unittest pythontest doxygen
+.PHONY: test all all-nogen generate unittest pythontest doxygen build-gentools
test: unittest pythontest
@@ -41,15 +41,25 @@ all:
## Generaged code
-SPEC := $(CURDIR)/../specs/amqp-8.0.xml
-XSL := code_gen.xsl framing.xsl
-STYLESHEETS := $(XSL:%=$(CURDIR)/etc/stylesheets/%)
-TRANSFORM := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC)
-generate: $(GENDIR)/timestamp
-$(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC)
- rm -rf $(GENDIR)
- mkdir -p $(GENDIR)/qpid/framing
- ( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp
+# Add all XML specs to be generated onto the following line
+SPECS := $(SPEC_DIR)/amqp-8.0.xml # $(SPEC_DIR)/amqp-0.9.test.xml $(SPEC_DIR)/cluster-0.9.test.xml $(SPEC_DIR)/amqp-0.10.test.xml
+GENERATE := java -cp $(GENTOOLS_DIR)/src org.apache.qpid.gentools.Main -c -o $(GENDIR)/qpid/framing -t $(GENTOOLS_DIR)/templ.cpp $(SPECS)
+generate: build-gentools $(GENDIR)/timestamp
+
+$(GENDIR)/timestamp: $(wildcard) $(SPECS)
+ @echo "---------- Generating code from $(SPECS) ----------"
+ @rm -rf $(GENDIR)
+ @mkdir -p $(GENDIR)/qpid/framing
+ @$(GENERATE)
+ @touch $(GENDIR)/timestamp
+ @echo "---------- Code generation complete ----------"
+
+#Build the code generator
+build-gentools: $(GENTOOLS_DIR)/src/org/apache/qpid/gentools/Main.class
+
+$(GENTOOLS_DIR)/src/org/apache/qpid/gentools/Main.class:
+ @echo "Gentools not built; building..."
+ @( cd $(GENTOOLS_DIR) && ./build )
# Dependencies for existing generated files.
GENFILES:=$(wildcard $(GENDIR)/qpid/*/*.cpp $(GENDIR)/qpid/*/*.h)
@@ -147,4 +157,5 @@ clean:
# Clean all builds
spotless:
rm -rf build
+ -rm $(GENTOOLS_DIR)/src/org/apache/qpid/gentools/*.class
diff --git a/cpp/options.mk b/cpp/options.mk
index 9fd966ad6a..ac07467916 100644
--- a/cpp/options.mk
+++ b/cpp/options.mk
@@ -49,12 +49,15 @@ endif
## Build directories.
BUILD :=$(PLATFORM)-$(BUILD)
-GENDIR:=build/gen
BINDIR:=build/$(BUILD)/bin
LIBDIR:=build/$(BUILD)/lib
OBJDIR:=build/$(BUILD)/obj
TESTDIR:=build/$(BUILD)/test
+GENDIR:=build/gen
+GENTOOLS_DIR:= ../gentools
+SPEC_DIR:=../specs
+
BUILDDIRS := $(BINDIR) $(LIBDIR) $(OBJDIR) $(TESTDIR) $(GENDIR)
SRCDIRS := src $(GENDIR)
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index f3624b4f3d..2894e294e0 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -54,8 +54,7 @@ bool Channel::exists(const string& consumerTag){
}
void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
- if(tag.empty()) tag = tagGenerator.generate();
-
+ if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
@@ -109,7 +108,7 @@ void Channel::rollback(){
accumulatedAck.clear();
}
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t deliveryTag = currentDeliveryTag++;
@@ -129,7 +128,7 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack) : parent(_parent),
tag(_tag),
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index 6ed867284c..42f65f2c7c 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -57,13 +57,13 @@ namespace qpid {
class Channel : private MessageBuilder::CompletionHandler{
class ConsumerImpl : public virtual Consumer{
Channel* parent;
- string tag;
+ const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
const bool ackExpected;
bool blocked;
public:
- ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
void requestDispatch();
@@ -90,7 +90,7 @@ namespace qpid {
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
virtual void complete(Message::shared_ptr& msg);
- void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
+ void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index fe6117ce18..2713fb9482 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -30,7 +30,7 @@ DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
}
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
@@ -40,7 +40,7 @@ void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie
}
}
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
@@ -53,7 +53,7 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F
}
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 0904facaef..09909a8383 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -42,11 +42,11 @@ namespace broker {
virtual std::string getType(){ return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual ~DirectExchange();
};
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 2cca942290..d66c4a9e0d 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -39,9 +39,9 @@ namespace qpid {
virtual ~Exchange(){}
string getName() { return name; }
virtual string getType() = 0;
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
};
}
}
diff --git a/cpp/src/qpid/broker/ExchangeBinding.cpp b/cpp/src/qpid/broker/ExchangeBinding.cpp
index 38c60cad0f..375fbe165e 100644
--- a/cpp/src/qpid/broker/ExchangeBinding.cpp
+++ b/cpp/src/qpid/broker/ExchangeBinding.cpp
@@ -24,7 +24,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
void ExchangeBinding::cancel(){
e->unbind(q, key, args);
diff --git a/cpp/src/qpid/broker/ExchangeBinding.h b/cpp/src/qpid/broker/ExchangeBinding.h
index 1133d14e51..fd1a05391a 100644
--- a/cpp/src/qpid/broker/ExchangeBinding.h
+++ b/cpp/src/qpid/broker/ExchangeBinding.h
@@ -34,9 +34,9 @@ namespace qpid {
Exchange* e;
Queue::shared_ptr q;
const string key;
- qpid::framing::FieldTable* args;
+ const qpid::framing::FieldTable* args;
public:
- ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args);
virtual void cancel();
virtual ~ExchangeBinding();
};
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index de0310495d..0f3223d3a6 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -28,7 +28,7 @@ using namespace qpid::sys;
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
Mutex::ScopedLock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
@@ -38,7 +38,7 @@ void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie
}
}
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
@@ -47,7 +47,7 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
}
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
msg.deliverTo(*i);
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 671f93819d..0e309a0e79 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -43,11 +43,11 @@ class FanOutExchange : public virtual Exchange {
virtual std::string getType(){ return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual ~FanOutExchange();
};
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 18676691e6..bf1a89c7e8 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -43,7 +43,7 @@ namespace {
HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
Mutex::ScopedLock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
@@ -53,7 +53,7 @@ void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fi
queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
@@ -61,7 +61,7 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (match(i->first, *args)) msg.deliverTo(i->second);
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 8669329ba0..5d1a51747f 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -46,11 +46,11 @@ class HeadersExchange : public virtual Exchange {
virtual std::string getType(){ return typeName; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
virtual ~HeadersExchange();
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index d398fd7e31..f71324f3fa 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -20,6 +20,10 @@
*/
#include <qpid/broker/Message.h>
#include <iostream>
+// AMQP version change - kpvdr 2006-11-17
+#include <qpid/framing/ProtocolVersion.h>
+#include <qpid/framing/BasicDeliverBody.h>
+#include <qpid/framing/BasicGetOkBody.h>
using namespace boost;
using namespace qpid::broker;
@@ -76,7 +80,10 @@ void Message::redeliver(){
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+// out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -86,7 +93,9 @@ void Message::sendGetOk(OutputHandler* out,
u_int64_t deliveryTag,
u_int32_t framesize){
- out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index f6b9e19209..3d0a0d358b 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -27,7 +27,6 @@
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/BasicHeaderProperties.h>
-#include <qpid/framing/BasicPublishBody.h>
#include <qpid/framing/OutputHandler.h>
namespace qpid {
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index bbb5d22c8d..c8c7b440aa 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -37,7 +37,9 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
AutoDelete* _cleaner,
const u_int32_t _timeout) :
context(_context),
- client(context),
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ client(context, 8, 0),
queues(_queues),
exchanges(_exchanges),
cleaner(_cleaner),
@@ -165,26 +167,26 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
- u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/,
- string& /*response*/, string& /*locale*/){
+ u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/){
parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
-void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){}
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
parent->framemax = framemax;
parent->heartbeat = heartbeat;
}
-void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
parent->client.getConnection().openOk(0, knownhosts);
}
void SessionHandlerImpl::ConnectionHandlerImpl::close(
- u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/,
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
parent->client.getConnection().closeOk(0);
@@ -197,7 +199,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
-void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
parent->client.getChannel().openOk(channel);
}
@@ -205,7 +207,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*o
void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/,
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
Channel* c = parent->getChannel(channel);
if(c){
@@ -220,9 +222,9 @@ void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type,
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- FieldTable& /*arguments*/){
+ const FieldTable& /*arguments*/){
if(passive){
if(!parent->exchanges->get(exchange)){
@@ -244,17 +246,17 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
parent->client.getExchange().declareOk(channel);
}
}
-
+
void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, bool /*ifUnused*/, bool nowait){
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
parent->exchanges->destroy(exchange);
if(!nowait) parent->client.getExchange().deleteOk(channel);
}
-void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name,
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, FieldTable& /*arguments*/){
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
@@ -282,34 +284,37 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
throw ChannelException(405, "Cannot grant exclusive access to queue");
}
if (!nowait) {
- name = queue->getName();
- parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
+ string queueName = queue->getName();
+ parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName,
- string& exchangeName, string& routingKey, bool nowait,
- FieldTable& arguments){
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
if(exchange){
- if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
- exchange->bind(queue, routingKey, &arguments);
+// kpvdr - cannot use this any longer as routingKey is now const
+// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+// exchange->bind(queue, routingKey, &arguments);
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
if(!nowait) parent->client.getQueue().bindOk(channel);
}else{
throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
}
}
-void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
if(!nowait) parent->client.getQueue().purgeOk(channel, count);
}
-void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue,
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -342,7 +347,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref
}
void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
- string& queueName, string& consumerTag,
+ const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait){
@@ -353,8 +358,9 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
}
try{
- channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
+ string newTag = consumerTag;
+ channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+ if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -365,13 +371,13 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
}
-void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
parent->getChannel(channel)->cancel(consumerTag);
if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchangeName, string& routingKey,
+ const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
@@ -383,7 +389,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
}
}
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
if(!parent->getChannel(channelId)->get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h
index cec7d8ada4..2df4b10f9b 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.h
@@ -117,18 +117,22 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public:
inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
- virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism,
- string& response, string& locale);
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
+ const string& response, const string& locale);
- virtual void secureOk(u_int16_t channel, string& response);
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void secureOk(u_int16_t channel, const string& response);
virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
- virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist);
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
- virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
u_int16_t methodId);
-
+
virtual void closeOk(u_int16_t channel);
virtual ~ConnectionHandlerImpl(){}
@@ -139,13 +143,15 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public:
inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
- virtual void open(u_int16_t channel, string& outOfBand);
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& outOfBand);
virtual void flow(u_int16_t channel, bool active);
virtual void flowOk(u_int16_t channel, bool active);
- virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,
u_int16_t classId, u_int16_t methodId);
virtual void closeOk(u_int16_t channel);
@@ -158,11 +164,13 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public:
inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
- virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
- qpid::framing::FieldTable& arguments);
+ const qpid::framing::FieldTable& arguments);
- virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait);
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
virtual ~ExchangeHandlerImpl(){}
};
@@ -173,20 +181,24 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public:
inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
- virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue,
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments);
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);
- virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue,
- string& exchange, string& routingKey, bool nowait,
- qpid::framing::FieldTable& arguments);
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,
+ const string& exchange, const string& routingKey, bool nowait,
+ const qpid::framing::FieldTable& arguments);
- virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
bool nowait);
- virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
bool nowait);
-
+
virtual ~QueueHandlerImpl(){}
};
@@ -197,15 +209,19 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
- virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive, bool nowait);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);
- virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait);
-
- virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey,
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
- virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck);
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);
virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
@@ -238,7 +254,11 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
- inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
+ inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
+
+ // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
+ // however v0.9 will not - kpvdr 2006-11-17
+ inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }
};
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 2ff4a42070..34fb25781e 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -117,14 +117,14 @@ bool TopicPattern::match(const Tokens& target) const
TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
Monitor::ScopedLock l(lock);
TopicPattern routingPattern(routingKey);
bindings[routingPattern].push_back(queue);
queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
@@ -136,7 +136,7 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 96f0283fdd..05fe871114 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -83,11 +83,11 @@ class TopicExchange : public virtual Exchange{
virtual std::string getType(){ return typeName; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
virtual ~TopicExchange();
};
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index a6c6bfea51..6901407072 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -22,6 +22,7 @@
#include <qpid/sys/Monitor.h>
#include <qpid/client/Message.h>
#include <qpid/QpidError.h>
+#include <qpid/client/MethodBodyInstances.h>
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
@@ -35,7 +36,10 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) :
incoming(0),
closed(true),
prefetch(_prefetch),
- transactional(_transactional)
+ transactional(_transactional),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
{ }
Channel::~Channel(){
@@ -50,9 +54,11 @@ void Channel::setPrefetch(u_int16_t _prefetch){
}
void Channel::setQos(){
- sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok);
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok);
+ sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
}
}
@@ -60,9 +66,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args));
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
if(synch){
- sendAndReceive(frame, exchange_declare_ok);
+ sendAndReceive(frame, method_bodies.exchange_declare_ok);
}else{
out->send(frame);
}
@@ -70,9 +76,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch));
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
if(synch){
- sendAndReceive(frame, exchange_delete_ok);
+ sendAndReceive(frame, method_bodies.exchange_delete_ok);
}else{
out->send(frame);
}
@@ -81,11 +87,11 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false,
+ AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
queue.isExclusive(),
queue.isAutoDelete(), !synch, args));
if(synch){
- sendAndReceive(frame, queue_declare_ok);
+ sendAndReceive(frame, method_bodies.queue_declare_ok);
if(queue.getName().length() == 0){
QueueDeclareOkBody::shared_ptr response =
dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
@@ -99,9 +105,9 @@ void Channel::declareQueue(Queue& queue, bool synch){
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch));
+ AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
if(synch){
- sendAndReceive(frame, queue_delete_ok);
+ sendAndReceive(frame, method_bodies.queue_delete_ok);
}else{
out->send(frame);
}
@@ -110,9 +116,9 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, args));
+ AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
if(synch){
- sendAndReceive(frame, queue_bind_ok);
+ sendAndReceive(frame, method_bodies.queue_bind_ok);
}else{
out->send(frame);
}
@@ -122,9 +128,9 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
int ackMode, bool noLocal, bool synch){
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+ AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
if(synch){
- sendAndReceive(frame, basic_consume_ok);
+ sendAndReceive(frame, method_bodies.basic_consume_ok);
BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
tag = response->getConsumerTag();
}else{
@@ -140,12 +146,12 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
void Channel::cancel(std::string& tag, bool synch){
Consumer* c = consumers[tag];
if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch));
+ AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
if(synch){
- sendAndReceive(frame, basic_cancel_ok);
+ sendAndReceive(frame, method_bodies.basic_cancel_ok);
}else{
out->send(frame);
}
@@ -181,12 +187,12 @@ void Channel::retrieve(Message& msg){
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode));
+ AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
responses.expect();
out->send(frame);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
- if(basic_get_ok.match(response.get())){
+ if(method_bodies.basic_get_ok.match(response.get())){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
@@ -195,7 +201,7 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode){
}
retrieve(msg);
return true;
- }if(basic_get_empty.match(response.get())){
+ }if(method_bodies.basic_get_empty.match(response.get())){
return false;
}else{
THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
@@ -207,7 +213,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate)));
+ out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
@@ -233,38 +239,38 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody());
- sendAndReceive(frame, tx_commit_ok);
+ AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
+ sendAndReceive(frame, method_bodies.tx_commit_ok);
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody());
- sendAndReceive(frame, tx_rollback_ok);
+ AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
+ sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
void Channel::handleMethod(AMQMethodBody::shared_ptr body){
//channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(basic_deliver.match(body.get())){
+ }else if(method_bodies.basic_deliver.match(body.get())){
if(incoming != 0){
std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
}
- }else if(basic_return.match(body.get())){
+ }else if(method_bodies.basic_return.match(body.get())){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
}
- }else if(channel_close.match(body.get())){
+ }else if(method_bodies.channel_close.match(body.get())){
con->removeChannel(this);
//need to signal application that channel has been closed through exception
- }else if(channel_flow.match(body.get())){
+ }else if(method_bodies.channel_flow.match(body.get())){
}else{
//signal error
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
index b2e08f5756..e850c1c626 100644
--- a/cpp/src/qpid/client/Channel.h
+++ b/cpp/src/qpid/client/Channel.h
@@ -65,6 +65,7 @@ namespace client {
u_int16_t prefetch;
const bool transactional;
+ qpid::framing::ProtocolVersion version;
void enqueue();
void retrieve(Message& msg);
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 93f170742a..de324fdab4 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -23,6 +23,7 @@
#include <qpid/client/Message.h>
#include <qpid/QpidError.h>
#include <iostream>
+#include <qpid/client/MethodBodyInstances.h>
using namespace qpid::client;
using namespace qpid::framing;
@@ -31,7 +32,11 @@ using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
-Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
+{
connector = new Connector(debug, _max_frame_size);
}
@@ -51,14 +56,14 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
ProtocolInitiation* header = new ProtocolInitiation(8, 0);
responses.expect();
connector->init(header);
- responses.receive(connection_start);
+ responses.receive(method_bodies.connection_start);
FieldTable props;
string mechanism("PLAIN");
string response = ((char)0) + uid + ((char)0) + pwd;
string locale("en_US");
responses.expect();
- out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale)));
+ out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -68,10 +73,10 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- responses.receive(connection_tune);
+ responses.receive(method_bodies.connection_tune);
ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+ out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
u_int16_t heartbeat = proposal->getHeartbeat();
connector->setReadTimeout(heartbeat * 2);
@@ -81,12 +86,12 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
string capabilities;
string vhost = virtualhost;
responses.expect();
- out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true)));
+ out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
responses.waitForResponse();
- if(responses.validate(connection_open_ok)){
+ if(responses.validate(method_bodies.connection_open_ok)){
//ok
- }else if(responses.validate(connection_redirect)){
+ }else if(responses.validate(method_bodies.connection_redirect)){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
std::cout << "Received redirection to " << redirect->getHost() << std::endl;
@@ -103,7 +108,7 @@ void Connection::close(){
u_int16_t classId(0);
u_int16_t methodId(0);
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok);
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
connector->close();
}
}
@@ -115,7 +120,7 @@ void Connection::openChannel(Channel* channel){
channels[channel->id] = channel;
//now send frame to open channel and wait for response
string oob;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok);
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
channel->setQos();
channel->closed = false;
}
@@ -133,7 +138,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_
//send frame to close channel
channel->cancelAll();
channel->closed = true;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok);
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
channel->con = 0;
channel->out = 0;
removeChannel(channel);
@@ -171,7 +176,7 @@ void Connection::handleMethod(AMQMethodBody::shared_ptr body){
//connection.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(connection_close.match(body.get())){
+ }else if(method_bodies.connection_close.match(body.get())){
//send back close ok
//close socket
ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
@@ -206,7 +211,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){
std::cout << " [" << methodid << ":" << classid << "]";
}
std::cout << std::endl;
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok);
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
connector->close();
}
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 340ebe9a0f..c7b1fb8dd0 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -59,6 +59,7 @@ class Connection : public virtual qpid::framing::InputHandler,
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
+ qpid::framing::ProtocolVersion version;
void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h
new file mode 100644
index 0000000000..a2bd9dadd9
--- /dev/null
+++ b/cpp/src/qpid/client/MethodBodyInstances.h
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/framing/amqp_framing.h>
+
+/**
+ * This file replaces the auto-generated instances in the former
+ * amqp_methods.h file. Add additional instances as needed.
+ */
+
+#ifndef _MethodBodyInstances_h_
+#define _MethodBodyInstances_h_
+
+namespace qpid {
+namespace client {
+
+class MethodBodyInstances
+{
+private:
+ qpid::framing::ProtocolVersion version;
+public:
+ const qpid::framing::BasicCancelOkBody basic_cancel_ok;
+ const qpid::framing::BasicConsumeOkBody basic_consume_ok;
+ const qpid::framing::BasicDeliverBody basic_deliver;
+ const qpid::framing::BasicGetEmptyBody basic_get_empty;
+ const qpid::framing::BasicGetOkBody basic_get_ok;
+ const qpid::framing::BasicQosBody basic_qos_ok;
+ const qpid::framing::BasicReturnBody basic_return;
+ const qpid::framing::ChannelCloseBody channel_close;
+ const qpid::framing::ChannelCloseOkBody channel_close_ok;
+ const qpid::framing::ChannelFlowBody channel_flow;
+ const qpid::framing::ChannelOpenOkBody channel_open_ok;
+ const qpid::framing::ConnectionCloseBody connection_close;
+ const qpid::framing::ConnectionCloseOkBody connection_close_ok;
+ const qpid::framing::ConnectionOpenOkBody connection_open_ok;
+ const qpid::framing::ConnectionRedirectBody connection_redirect;
+ const qpid::framing::ConnectionStartBody connection_start;
+ const qpid::framing::ConnectionTuneBody connection_tune;
+ const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ const qpid::framing::QueueDeclareOkBody queue_declare_ok;
+ const qpid::framing::QueueDeleteOkBody queue_delete_ok;
+ const qpid::framing::QueueBindOkBody queue_bind_ok;
+ const qpid::framing::TxCommitOkBody tx_commit_ok;
+ const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ const qpid::framing::TxSelectOkBody tx_select_ok;
+
+ MethodBodyInstances(u_int8_t major, u_int8_t minor) :
+ version(major, minor),
+ basic_cancel_ok(version),
+ basic_consume_ok(version),
+ basic_deliver(version),
+ basic_get_empty(version),
+ basic_get_ok(version),
+ basic_qos_ok(version),
+ basic_return(version),
+ channel_close(version),
+ channel_close_ok(version),
+ channel_flow(version),
+ channel_open_ok(version),
+ connection_close(version),
+ connection_close_ok(version),
+ connection_open_ok(version),
+ connection_redirect(version),
+ connection_start(version),
+ connection_tune(version),
+ exchange_declare_ok(version),
+ exchange_delete_ok(version),
+ queue_declare_ok(version),
+ queue_delete_ok(version),
+ queue_bind_ok(version),
+ tx_commit_ok(version),
+ tx_rollback_ok(version),
+ tx_select_ok(version)
+ {}
+
+};
+
+static MethodBodyInstances method_bodies(8, 0);
+
+} // namespace client
+} // namespace qpid
+
+#endif
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index ba6f4fe2de..b58bc93545 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -24,14 +24,23 @@
using namespace qpid::framing;
-AMQFrame::AMQFrame(){}
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+AMQFrame::AMQFrame() : versionMap(8, 0) {}
-AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : channel(_channel), body(_body){}
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) :
+channel(_channel), body(_body), versionMap(8, 0)
+{}
-AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : channel(_channel), body(_body){}
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) :
+channel(_channel), body(_body), versionMap(8, 0)
+{}
-AMQFrame::~AMQFrame(){
-}
+AMQFrame::~AMQFrame() {}
u_int16_t AMQFrame::getChannel(){
return channel;
@@ -50,10 +59,14 @@ void AMQFrame::encode(Buffer& buffer)
buffer.putOctet(0xCE);
}
-AMQBody::shared_ptr createMethodBody(Buffer& buffer){
+AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){
u_int16_t classId = buffer.getShort();
u_int16_t methodId = buffer.getShort();
- AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId));
+ // AMQP version management change - kpvdr 2006-11-16
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, 8, 0));
+ // Origianl stmt:
+ // AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId));
return body;
}
@@ -108,10 +121,13 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
body->decode(buffer, bufSize);
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){
+std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
+{
out << "Frame[channel=" << t.channel << "; ";
- if (t.body.get() == 0) out << "empty";
- else out << *t.body;
+ if (t.body.get() == 0)
+ out << "empty";
+ else
+ out << *t.body;
out << "]";
return out;
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index bb1ecfac36..29ee1250e1 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include <qpid/framing/amqp_methods.h>
+/*#include <qpid/framing/amqp_methods.h>*/
#include <qpid/framing/amqp_types.h>
#include <qpid/framing/AMQBody.h>
#include <qpid/framing/AMQDataBlock.h>
@@ -26,6 +26,7 @@
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeartbeatBody.h>
+#include <qpid/framing/AMQP_MethodVersionMap.h>
#include <qpid/framing/Buffer.h>
#ifndef _AMQFrame_
@@ -39,7 +40,9 @@ namespace qpid {
u_int16_t channel;
u_int8_t type;//used if the body is decoded separately from the 'head'
AMQBody::shared_ptr body;
-
+ AMQP_MethodVersionMap versionMap;
+ AMQBody::shared_ptr createMethodBody(Buffer& buffer);
+
public:
AMQFrame();
AMQFrame(u_int16_t channel, AMQBody* body);
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 3f7a668e57..e6e592761e 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -35,9 +35,12 @@ class AMQMethodBody : virtual public AMQBody
public:
typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
+ ProtocolVersion version;
inline u_int8_t type() const { return METHOD_BODY; }
inline u_int32_t size() const { return 4 + bodySize(); }
- inline virtual ~AMQMethodBody(){}
+ inline AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
+ inline AMQMethodBody(ProtocolVersion version) : version(version) {}
+ inline virtual ~AMQMethodBody() {}
virtual void print(std::ostream& out) const = 0;
virtual u_int16_t amqpMethodId() const = 0;
virtual u_int16_t amqpClassId() const = 0;
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index 3f5f1a10af..6dbfd55a5c 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -19,8 +19,8 @@
*
*/
#include <qpid/framing/amqp_types.h>
-#include <qpid/framing/amqp_methods.h>
#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
#include <qpid/framing/HeaderProperties.h>
#ifndef _BasicHeaderProperties_
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index e266293003..6714eddf07 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -26,9 +26,11 @@
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeartbeatBody.h>
-#include <qpid/framing/amqp_methods.h>
+#include <qpid/framing/AMQP_MethodVersionMap.h>
#include <qpid/framing/InputHandler.h>
#include <qpid/framing/OutputHandler.h>
#include <qpid/framing/InitiationHandler.h>
#include <qpid/framing/ProtocolInitiation.h>
#include <qpid/framing/BasicHeaderProperties.h>
+#include <qpid/framing/ProtocolVersion.h>
+#include <qpid/framing/ProtocolVersionException.h>
diff --git a/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp
index f73e80b36c..2758492050 100644
--- a/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp
+++ b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp
@@ -63,12 +63,15 @@ private:
CPPUNIT_ASSERT_EQUAL(heartbeat, body.get());
}
};
+ ProtocolVersion v;
public:
+
+ BodyHandlerTest() : v(8, 0) {}
void testMethod()
{
- AMQMethodBody* method = new QueueDeclareBody();
+ AMQMethodBody* method = new QueueDeclareBody(v);
AMQFrame frame(0, method);
TestBodyHandler handler(method);
handler.handleBody(frame.getBody());
diff --git a/cpp/test/unit/qpid/framing/FramingTest.cpp b/cpp/test/unit/qpid/framing/FramingTest.cpp
index 3aa0901503..aa8a9a10de 100644
--- a/cpp/test/unit/qpid/framing/FramingTest.cpp
+++ b/cpp/test/unit/qpid/framing/FramingTest.cpp
@@ -19,6 +19,7 @@
*
*/
#include <qpid/framing/ConnectionRedirectBody.h>
+#include <qpid/framing/ProtocolVersion.h>
#include <qpid/framing/amqp_framing.h>
#include <iostream>
#include <qpid_test_plugin.h>
@@ -49,17 +50,20 @@ class FramingTest : public CppUnit::TestCase
private:
Buffer buffer;
+ ProtocolVersion v;
public:
- FramingTest() : buffer(100) {}
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ FramingTest() : buffer(100), v(8, 0) {}
void testBasicQosBody()
{
- BasicQosBody in(0xCAFEBABE, 0xABBA, true);
+ BasicQosBody in(v, 0xCAFEBABE, 0xABBA, true);
in.encodeContent(buffer);
buffer.flip();
- BasicQosBody out;
+ BasicQosBody out(v);
out.decodeContent(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -67,10 +71,10 @@ class FramingTest : public CppUnit::TestCase
void testConnectionSecureBody()
{
std::string s = "security credential";
- ConnectionSecureBody in(s);
+ ConnectionSecureBody in(v, s);
in.encodeContent(buffer);
buffer.flip();
- ConnectionSecureBody out;
+ ConnectionSecureBody out(v);
out.decodeContent(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -79,10 +83,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(a, b);
+ ConnectionRedirectBody in(v, a, b);
in.encodeContent(buffer);
buffer.flip();
- ConnectionRedirectBody out;
+ ConnectionRedirectBody out(v);
out.decodeContent(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -90,10 +94,10 @@ class FramingTest : public CppUnit::TestCase
void testAccessRequestBody()
{
std::string s = "text";
- AccessRequestBody in(s, true, false, true, false, true);
+ AccessRequestBody in(v, s, true, false, true, false, true);
in.encodeContent(buffer);
buffer.flip();
- AccessRequestBody out;
+ AccessRequestBody out(v);
out.decodeContent(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -102,10 +106,10 @@ class FramingTest : public CppUnit::TestCase
{
std::string q = "queue";
std::string t = "tag";
- BasicConsumeBody in(0, q, t, false, true, false, false);
+ BasicConsumeBody in(v, 0, q, t, false, true, false, false);
in.encodeContent(buffer);
buffer.flip();
- BasicConsumeBody out;
+ BasicConsumeBody out(v);
out.decodeContent(buffer);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -115,7 +119,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(999, new ConnectionRedirectBody(a, b));
+ AMQFrame in(999, new ConnectionRedirectBody(v, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -126,7 +130,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(999, new BasicConsumeOkBody(s));
+ AMQFrame in(999, new BasicConsumeOkBody(v, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;