summaryrefslogtreecommitdiff
path: root/cpp/lib/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp11
-rw-r--r--cpp/lib/broker/BrokerChannel.h3
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp20
-rw-r--r--cpp/lib/broker/BrokerMessage.h8
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp9
5 files changed, 29 insertions, 22 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 5d4f68a8af..f569872770 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -31,7 +31,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
+Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
id(_id),
out(_out),
currentDeliveryTag(1),
@@ -41,7 +41,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStor
framesize(_framesize),
tagGenerator("sgen"),
store(_store),
- messageBuilder(this, _store, _stagingThreshold){
+ messageBuilder(this, _store, _stagingThreshold),
+ version(_version){
outstanding.reset();
}
@@ -118,7 +119,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -242,7 +243,7 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
if(msg){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -253,5 +254,5 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
- msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index fa3912c78e..888ca3c051 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -88,6 +88,7 @@ namespace qpid {
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
+ qpid::framing::ProtocolVersion version; // version used for this channel
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
@@ -95,7 +96,7 @@ namespace qpid {
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
+ Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
~Channel();
inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 598de2d590..7fef77e1ff 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -24,8 +24,6 @@
#include <InMemoryContent.h>
#include <LazyLoadedContent.h>
#include <MessageStore.h>
-// AMQP version change - kpvdr 2006-11-17
-#include <ProtocolVersion.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
@@ -79,11 +77,10 @@ void Message::redeliver(){
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize){
-
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
- out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -91,11 +88,10 @@ void Message::sendGetOk(OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize){
-
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
- out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 3bf70551d3..39142546bc 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
+#include <ProtocolVersion.h>
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
@@ -37,6 +38,7 @@ namespace qpid {
class MessageStore;
using qpid::framing::string;
+
/**
* Represents an AMQP message, i.e. a header body, a list of
* content bodies and some details about the publication
@@ -76,12 +78,14 @@ namespace qpid {
int channel,
const string& consumerTag,
u_int64_t deliveryTag,
- u_int32_t framesize);
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
void sendGetOk(qpid::framing::OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize);
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index bd6ca9dee9..9131060b81 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -131,7 +131,10 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
if (client == NULL)
{
client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
-
+
+
+ std::cout << "---------------" << this << std::endl;
+
//send connection start
FieldTable properties;
string mechanisms("PLAIN");
@@ -212,7 +215,9 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
- parent->channels[channel] = new Channel(parent->context, channel, parent->framemax,
+
+
+ parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax,
parent->queues->getStore(), parent->settings.stagingThreshold);
parent->client->getChannel().openOk(channel);
}