diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-20 22:29:38 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-20 22:29:38 +0000 |
commit | 786c13d1833f626bf47262dd16ea48c81ac3887f (patch) | |
tree | a24df1b5de4584d3055a754235e93bdee1ad0075 /cpp/lib/broker | |
parent | dc0593dbce33328266edade35431a6571342786c (diff) | |
download | qpid-python-786c13d1833f626bf47262dd16ea48c81ac3887f.tar.gz |
Support for multi version, merge part 1. - can still refactor out dup use of
version object in client and server opperations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 11 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 3 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 20 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 8 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 9 |
5 files changed, 29 insertions, 22 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 5d4f68a8af..f569872770 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -31,7 +31,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : +Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : id(_id), out(_out), currentDeliveryTag(1), @@ -41,7 +41,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStor framesize(_framesize), tagGenerator("sgen"), store(_store), - messageBuilder(this, _store, _stagingThreshold){ + messageBuilder(this, _store, _stagingThreshold), + version(_version){ outstanding.reset(); } @@ -118,7 +119,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, deliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -242,7 +243,7 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ if(msg){ Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -253,5 +254,5 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ - msg->deliver(out, id, consumerTag, deliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index fa3912c78e..888ca3c051 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -88,6 +88,7 @@ namespace qpid { MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to + qpid::framing::ProtocolVersion version; // version used for this channel virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); @@ -95,7 +96,7 @@ namespace qpid { bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize, MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); ~Channel(); inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 598de2d590..7fef77e1ff 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -24,8 +24,6 @@ #include <InMemoryContent.h> #include <LazyLoadedContent.h> #include <MessageStore.h> -// AMQP version change - kpvdr 2006-11-17 -#include <ProtocolVersion.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> @@ -79,11 +77,10 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize){ - - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version - out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -91,11 +88,10 @@ void Message::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, - u_int32_t framesize){ - - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version - out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 3bf70551d3..39142546bc 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -25,6 +25,7 @@ #include <boost/shared_ptr.hpp> #include <AMQContentBody.h> #include <AMQHeaderBody.h> +#include <ProtocolVersion.h> #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> @@ -37,6 +38,7 @@ namespace qpid { class MessageStore; using qpid::framing::string; + /** * Represents an AMQP message, i.e. a header body, a list of * content bodies and some details about the publication @@ -76,12 +78,14 @@ namespace qpid { int channel, const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize); + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); void sendGetOk(qpid::framing::OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, - u_int32_t framesize); + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index bd6ca9dee9..9131060b81 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -131,7 +131,10 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ if (client == NULL) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); - + + + std::cout << "---------------" << this << std::endl; + //send connection start FieldTable properties; string mechanisms("PLAIN"); @@ -212,7 +215,9 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, + + + parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); parent->client->getChannel().openOk(channel); } |