diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 14 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/Content.h | 3 | ||||
-rw-r--r-- | cpp/lib/broker/InMemoryContent.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/broker/InMemoryContent.h | 3 | ||||
-rw-r--r-- | cpp/lib/broker/LazyLoadedContent.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/LazyLoadedContent.h | 2 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 38 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 18 | ||||
-rw-r--r-- | cpp/lib/client/Connector.cpp | 7 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.cpp | 26 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 8 | ||||
-rw-r--r-- | cpp/tests/BodyHandlerTest.cpp | 9 | ||||
-rw-r--r-- | cpp/tests/FramingTest.cpp | 4 | ||||
-rw-r--r-- | cpp/tests/InMemoryContentTest.cpp | 3 | ||||
-rw-r--r-- | cpp/tests/LazyLoadedContentTest.cpp | 3 |
17 files changed, 80 insertions, 78 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 7fef77e1ff..6ba2131a74 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -80,8 +80,8 @@ void Message::deliver(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); - sendContent(out, channel, framesize); + out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize, version); } void Message::sendGetOk(OutputHandler* out, @@ -91,16 +91,16 @@ void Message::sendGetOk(OutputHandler* out, u_int32_t framesize, ProtocolVersion* version){ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); - sendContent(out, channel, framesize); + out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize, version); } -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); - out->send(new AMQFrame(channel, headerBody)); + out->send(new AMQFrame(*version, channel, headerBody)); Mutex::ScopedLock locker(contentLock); - if (content.get()) content->send(out, channel, framesize); + if (content.get()) content->send(*version, out, channel, framesize); } BasicHeaderProperties* Message::getHeaderProperties(){ diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 39142546bc..1f68e1004a 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -58,7 +58,7 @@ namespace qpid { qpid::sys::Mutex contentLock; void sendContent(qpid::framing::OutputHandler* out, - int channel, u_int32_t framesize); + int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); public: typedef boost::shared_ptr<Message> shared_ptr; diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h index b5712c35ed..8aacf02959 100644 --- a/cpp/lib/broker/Content.h +++ b/cpp/lib/broker/Content.h @@ -24,6 +24,7 @@ #include <AMQContentBody.h> #include <Buffer.h> #include <OutputHandler.h> +#include <ProtocolVersion.h> namespace qpid { namespace broker { @@ -31,7 +32,7 @@ namespace qpid { public: virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; virtual u_int32_t size() = 0; - virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; + virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; virtual void encode(qpid::framing::Buffer& buffer) = 0; virtual void destroy() = 0; virtual ~Content(){} diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp index 8826b42d2c..07af8633e5 100644 --- a/cpp/lib/broker/InMemoryContent.cpp +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -38,24 +38,24 @@ u_int32_t InMemoryContent::size() return sum; } -void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize) +void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) { for (content_iterator i = content.begin(); i != content.end(); i++) { if ((*i)->size() > framesize) { u_int32_t offset = 0; for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { string data = (*i)->getData().substr(offset, framesize); - out->send(new AMQFrame(channel, new AMQContentBody(data))); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); offset += framesize; } u_int32_t remainder = (*i)->size() % framesize; if (remainder) { string data = (*i)->getData().substr(offset, remainder); - out->send(new AMQFrame(channel, new AMQContentBody(data))); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); } } else { AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); + out->send(new AMQFrame(version, channel, contentBody)); } } } diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h index 79c7cf670b..1db1acd7e1 100644 --- a/cpp/lib/broker/InMemoryContent.h +++ b/cpp/lib/broker/InMemoryContent.h @@ -24,6 +24,7 @@ #include <Content.h> #include <vector> + namespace qpid { namespace broker { class InMemoryContent : public Content{ @@ -34,7 +35,7 @@ namespace qpid { public: void add(qpid::framing::AMQContentBody::shared_ptr data); u_int32_t size(); - void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); void destroy(); ~InMemoryContent(){} diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp index 51aa6c590b..ec1ca3e195 100644 --- a/cpp/lib/broker/LazyLoadedContent.cpp +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -36,19 +36,19 @@ u_int32_t LazyLoadedContent::size() return 0;//all content is written as soon as it is added } -void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize) +void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) { if (expectedSize > framesize) { for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { u_int64_t remaining = expectedSize - offset; string data; store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining); - out->send(new AMQFrame(channel, new AMQContentBody(data))); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); } } else { string data; store->loadContent(msg, data, 0, expectedSize); - out->send(new AMQFrame(channel, new AMQContentBody(data))); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); } } diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h index 68e08c7c3f..80f8cce4eb 100644 --- a/cpp/lib/broker/LazyLoadedContent.h +++ b/cpp/lib/broker/LazyLoadedContent.h @@ -34,7 +34,7 @@ namespace qpid { LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize); void add(qpid::framing::AMQContentBody::shared_ptr data); u_int32_t size(); - void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); void destroy(); ~LazyLoadedContent(){} diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index c7b8e39ae5..3d0b547b07 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -56,9 +56,9 @@ void Channel::setPrefetch(u_int16_t _prefetch){ void Channel::setQos(){ // AMQP version management change - kpvdr 2006-11-20 // TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); + sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok); + sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok); } } @@ -66,7 +66,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); + AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); if(synch){ sendAndReceive(frame, method_bodies.exchange_declare_ok); }else{ @@ -76,7 +76,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch)); if(synch){ sendAndReceive(frame, method_bodies.exchange_delete_ok); }else{ @@ -87,7 +87,7 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, + AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false, queue.isExclusive(), queue.isAutoDelete(), !synch, args)); if(synch){ @@ -105,7 +105,7 @@ void Channel::declareQueue(Queue& queue, bool synch){ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); if(synch){ sendAndReceive(frame, method_bodies.queue_delete_ok); }else{ @@ -116,7 +116,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args)); + AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args)); if(synch){ sendAndReceive(frame, method_bodies.queue_bind_ok); }else{ @@ -130,7 +130,7 @@ void Channel::consume( { string q = queue.getName(); AMQFrame* frame = - new AMQFrame( + new AMQFrame(version, id, new BasicConsumeBody( version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, @@ -152,10 +152,10 @@ void Channel::consume( void Channel::cancel(std::string& tag, bool synch){ Consumer* c = consumers[tag]; if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true))); + out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch)); + AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch)); if(synch){ sendAndReceive(frame, method_bodies.basic_cancel_ok); }else{ @@ -171,7 +171,7 @@ void Channel::cancelAll(){ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ Consumer* c = i->second; if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true))); } consumers.erase(i); delete c; @@ -193,7 +193,7 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode)); + AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode)); responses.expect(); out->send(frame); responses.waitForResponse(); @@ -219,25 +219,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(id, body)); + out->send(new AMQFrame(version, id, body)); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes if(data_length < frag_size){ - out->send(new AMQFrame(id, new AMQContentBody(data))); + out->send(new AMQFrame(version, id, new AMQContentBody(data))); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - out->send(new AMQFrame(id, new AMQContentBody(frag))); + out->send(new AMQFrame(version, id, new AMQContentBody(frag))); offset += length; remaining = data_length - offset; @@ -247,12 +247,12 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version)); + AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version)); sendAndReceive(frame, method_bodies.tx_commit_ok); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version)); + AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version)); sendAndReceive(frame, method_bodies.tx_rollback_ok); } @@ -377,7 +377,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){ if(++(consumer->count) < prefetch) break; //else drop-through case AUTO_ACK: - out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); + out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple))); consumer->lastDeliveryTag = 0; } } diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 78aeafb37b..ad8aa1d0dd 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -35,7 +35,7 @@ u_int16_t Connection::channelIdCounter; Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), version(_version->getMajor(),_version->getMinor()) { - connector = new Connector(debug, _max_frame_size); + connector = new Connector(version, debug, _max_frame_size); } Connection::~Connection(){ @@ -61,7 +61,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui string response = ((char)0) + uid + ((char)0) + pwd; string locale("en_US"); responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); + out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); /** * Assume for now that further challenges will not be required @@ -74,7 +74,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui responses.receive(method_bodies.connection_tune); ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); u_int16_t heartbeat = proposal->getHeartbeat(); connector->setReadTimeout(heartbeat * 2); @@ -84,7 +84,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui string capabilities; string vhost = virtualhost; responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); + out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). responses.waitForResponse(); if(responses.validate(method_bodies.connection_open_ok)){ @@ -106,7 +106,7 @@ void Connection::close(){ u_int16_t classId(0); u_int16_t methodId(0); - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); + sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); } } @@ -118,7 +118,7 @@ void Connection::openChannel(Channel* channel){ channels[channel->id] = channel; //now send frame to open channel and wait for response string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); + channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); channel->setQos(); channel->closed = false; } @@ -136,7 +136,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_ //send frame to close channel channel->cancelAll(); channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); + channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); channel->con = 0; channel->out = 0; removeChannel(channel); @@ -209,7 +209,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){ std::cout << " [" << methodid << ":" << classid << "]"; } std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); + sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); connector->close(); } @@ -230,7 +230,7 @@ void Connection::idleIn(){ } void Connection::idleOut(){ - out->send(new AMQFrame(0, new AMQHeartbeatBody())); + out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); } void Connection::shutdown(){ diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index 2bd77c1bcd..b34e66fd94 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -28,10 +28,11 @@ using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), +Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) : + debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), + version(pVersion), closed(true), lastIn(0), lastOut(0), timeout(0), @@ -162,7 +163,7 @@ void Connector::run(){ inbuf.move(received); inbuf.flip();//position = 0, limit = total data read - AMQFrame frame; + AMQFrame frame(version); while(frame.decode(inbuf)){ if(debug) std::cout << "RECV: " << frame << std::endl; input->received(&frame); diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index c64472bd53..f9e50f3216 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -26,6 +26,7 @@ #include <framing/OutputHandler.h> #include <framing/InitiationHandler.h> #include <framing/ProtocolInitiation.h> +#include <ProtocolVersion.h> #include <sys/ShutdownHandler.h> #include <sys/TimeoutHandler.h> #include <sys/Thread.h> @@ -41,6 +42,7 @@ namespace client { const bool debug; const int receive_buffer_size; const int send_buffer_size; + qpid::framing::ProtocolVersion version; bool closed; @@ -73,7 +75,7 @@ namespace client { void handleClosed(); public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); + Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(qpid::framing::ProtocolInitiation* header); diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 5eb76a87ee..2f1efcbe09 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -26,20 +26,16 @@ using namespace qpid::framing; AMQP_MethodVersionMap AMQFrame::versionMap; -// AMQP version management change - kpvdr 2-11-17 -// TODO: Make this class version-aware -AMQFrame::AMQFrame() {} - -// AMQP version management change - kpvdr 2006-11-17 -// TODO: Make this class version-aware -AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : -channel(_channel), body(_body) +AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version): +version(_version) + {} + +AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : +version(_version), channel(_channel), body(_body) {} -// AMQP version management change - kpvdr 2006-11-17 -// TODO: Make this class version-aware -AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : -channel(_channel), body(_body) +AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody::shared_ptr& _body) : +version(_version), channel(_channel), body(_body) {} AMQFrame::~AMQFrame() {} @@ -64,11 +60,7 @@ void AMQFrame::encode(Buffer& buffer) AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){ u_int16_t classId = buffer.getShort(); u_int16_t methodId = buffer.getShort(); - // AMQP version management change - kpvdr 2006-11-16 - // TODO: Make this class version-aware and link these hard-wired numbers to that version - AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, 0, 9)); - // Origianl stmt: - // AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId)); + AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, version.getMajor(), version.getMinor())); return body; } diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index 9962f85b62..ff27e1e68f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -27,6 +27,7 @@ #include <AMQContentBody.h> #include <AMQHeartbeatBody.h> #include <AMQP_MethodVersionMap.h> +#include <AMQP_HighestVersion.h> #include <Buffer.h> #ifndef _AMQFrame_ @@ -39,6 +40,7 @@ namespace framing { class AMQFrame : virtual public AMQDataBlock { static AMQP_MethodVersionMap versionMap; + qpid::framing::ProtocolVersion version; u_int16_t channel; u_int8_t type;//used if the body is decoded separately from the 'head' @@ -46,9 +48,9 @@ class AMQFrame : virtual public AMQDataBlock AMQBody::shared_ptr createMethodBody(Buffer& buffer); public: - AMQFrame(); - AMQFrame(u_int16_t channel, AMQBody* body); - AMQFrame(u_int16_t channel, AMQBody::shared_ptr& body); + AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion); + AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); + AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); diff --git a/cpp/tests/BodyHandlerTest.cpp b/cpp/tests/BodyHandlerTest.cpp index 5c3cba0f25..bf60dd21e3 100644 --- a/cpp/tests/BodyHandlerTest.cpp +++ b/cpp/tests/BodyHandlerTest.cpp @@ -19,6 +19,7 @@ * */ #include <iostream> +#include <AMQP_HighestVersion.h> #include <amqp_framing.h> #include <qpid_test_plugin.h> using namespace qpid::framing; @@ -72,7 +73,7 @@ public: void testMethod() { AMQMethodBody* method = new QueueDeclareBody(v); - AMQFrame frame(0, method); + AMQFrame frame(highestProtocolVersion, 0, method); TestBodyHandler handler(method); handler.handleBody(frame.getBody()); } @@ -80,7 +81,7 @@ public: void testHeader() { AMQHeaderBody* header = new AMQHeaderBody(); - AMQFrame frame(0, header); + AMQFrame frame(highestProtocolVersion, 0, header); TestBodyHandler handler(header); handler.handleBody(frame.getBody()); } @@ -88,7 +89,7 @@ public: void testContent() { AMQContentBody* content = new AMQContentBody(); - AMQFrame frame(0, content); + AMQFrame frame(highestProtocolVersion, 0, content); TestBodyHandler handler(content); handler.handleBody(frame.getBody()); } @@ -96,7 +97,7 @@ public: void testHeartbeat() { AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody(); - AMQFrame frame(0, heartbeat); + AMQFrame frame(highestProtocolVersion, 0, heartbeat); TestBodyHandler handler(heartbeat); handler.handleBody(frame.getBody()); } diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index bc65891956..e4d289d3d5 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -120,7 +120,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(999, new ConnectionRedirectBody(v, a, b)); + AMQFrame in(highestProtocolVersion, 999, new ConnectionRedirectBody(v, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -131,7 +131,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(999, new BasicConsumeOkBody(v, s)); + AMQFrame in(highestProtocolVersion, 999, new BasicConsumeOkBody(v, s)); in.encode(buffer); buffer.flip(); AMQFrame out; diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index 64789ed836..bd638dae66 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -20,6 +20,7 @@ */ #include <InMemoryContent.h> #include <qpid_test_plugin.h> +#include <AMQP_HighestVersion.h> #include <iostream> #include <list> @@ -66,7 +67,7 @@ public: u_int16_t channel = 3; addframes(content, inCount, in); - content.send(&handler, channel, framesize); + content.send(highestProtocolVersion, &handler, channel, framesize); check(handler, channel, outCount, out); } diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index a8e7d61e0d..2075a6dd3a 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -19,6 +19,7 @@ * */ #include <LazyLoadedContent.h> +#include <AMQP_HighestVersion.h> #include <NullMessageStore.h> #include <qpid_test_plugin.h> #include <iostream> @@ -99,7 +100,7 @@ public: LazyLoadedContent content(&store, 0, in.size()); DummyHandler handler; u_int16_t channel = 3; - content.send(&handler, channel, framesize); + content.send(highestProtocolVersion, &handler, channel, framesize); check(handler, channel, outCount, out); } |