summaryrefslogtreecommitdiff
path: root/cpp/broker/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
committerGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
commit3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch)
treed4671a2fbb8ad69a0cc734134b43b28152c3e5b4 /cpp/broker/src
parent14654e5360b72adf1704838b3820c7d1fc860e8e (diff)
downloadqpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src')
-rw-r--r--cpp/broker/src/Channel.cpp99
-rw-r--r--cpp/broker/src/Message.cpp12
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp6
3 files changed, 78 insertions, 39 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index ae99f4e7fa..f887e080a6 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -28,19 +28,18 @@ using namespace qpid::concurrent;
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
id(_id),
+ prefetchCount(0),
+ prefetchSize(0),
+ outstandingSize(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
tagGenerator("sgen"){}
Channel::~Channel(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl;
- delete (i->second);
- }
}
-bool Channel::exists(string& consumerTag){
+bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
@@ -57,27 +56,26 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl
}
}
-void Channel::cancel(string& tag){
+void Channel::cancel(consumer_iterator i){
+ ConsumerImpl* c = i->second;
+ consumers.erase(i);
+ if(c){
+ c->cancel();
+ delete c;
+ }
+}
+
+void Channel::cancel(const string& tag){
consumer_iterator i = consumers.find(tag);
if(i != consumers.end()){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ cancel(i);
}
}
void Channel::close(){
//cancel all consumers
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ cancel(i);
}
}
@@ -99,33 +97,50 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
u_int64_t myDeliveryTag = deliveryTag++;
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+ outstandingSize += msg->contentSize();
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
}
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Locker locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ return countOk && sizeOk;
+}
+
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack) : parent(_parent),
tag(_tag),
queue(_queue),
connection(_connection),
- ackExpected(ack){
+ ackExpected(ack),
+ blocked(false){
}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(connection != msg->getPublisher()){
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }else{
- return false;
+ if(connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
}
+ return false;
}
void Channel::ConsumerImpl::cancel(){
if(queue) queue->cancel(this);
}
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked) queue->dispatch();
+}
+
void Channel::checkMessage(const std::string& text){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
@@ -140,20 +155,36 @@ void Channel::handlePublish(Message* msg){
}
void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag));
if(i == unacknowledged.end()){
- //error: how should this be signalled?
- }else if(multiple){
+ throw InvalidAckException();
+ }else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
+ //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?):
+ outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize();
}else{
- unacknowledged.erase(i);
+ outstandingSize -= i->msg->contentSize();
+ unacknowledged.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i++){
+ i->second->requestDispatch();
}
}
void Channel::recover(bool requeue){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
if(requeue){
- for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
- unacknowledged.clear();
+ outstandingSize = 0;
+ ack_iterator start(unacknowledged.begin());
+ ack_iterator end(unacknowledged.end());
+ for_each(start, end, Requeue());
+ unacknowledged.erase(start, end);
}else{
for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
}
@@ -175,3 +206,13 @@ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
}
+
+Channel::AddSize::AddSize() : size(0){}
+
+void Channel::AddSize::operator()(AckRecord& record){
+ size += record.msg->contentSize();
+}
+
+u_int32_t Channel::AddSize::getSize(){
+ return size;
+}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index a4ae85e904..a44eeaab59 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -33,7 +33,8 @@ Message::Message(const ConnectionToken* const _publisher,
routingKey(_routingKey),
mandatory(_mandatory),
immediate(_immediate),
- redelivered(false){
+ redelivered(false),
+ size(0){
}
@@ -46,6 +47,7 @@ void Message::setHeader(AMQHeaderBody::shared_ptr header){
void Message::addContent(AMQContentBody::shared_ptr data){
content.push_back(data);
+ size += data->size();
}
bool Message::isComplete(){
@@ -78,14 +80,6 @@ BasicHeaderProperties* Message::getHeaderProperties(){
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-u_int64_t Message::contentSize(){
- u_int64_t size(0);
- for(content_iterator i = content.begin(); i != content.end(); i++){
- size += (*i)->size();
- }
- return size;
-}
-
const ConnectionToken* const Message::getPublisher(){
return publisher;
}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 63a42a7fd6..857730f3f7 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -385,7 +385,11 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
- parent->getChannel(channel)->ack(deliveryTag, multiple);
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
}
void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){}