summaryrefslogtreecommitdiff
path: root/cpp/broker/src/Channel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/broker/src/Channel.cpp
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r--cpp/broker/src/Channel.cpp256
1 files changed, 0 insertions, 256 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
deleted file mode 100644
index 34d69716c4..0000000000
--- a/cpp/broker/src/Channel.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "QpidError.h"
-#include <iostream>
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
- id(_id),
- out(_out),
- deliveryTag(1),
- transactional(false),
- prefetchSize(0),
- prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
- framesize(_framesize),
- tagGenerator("sgen"){}
-
-Channel::~Channel(){
-}
-
-bool Channel::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
- if(tag.empty()) tag = tagGenerator.generate();
-
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- }catch(ExclusiveAccessException& e){
- delete c;
- throw e;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
-}
-
-void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
-}
-
-void Channel::close(){
- //cancel all consumers
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- cancel(i);
- }
-}
-
-void Channel::begin(){
- transactional = true;
-}
-
-void Channel::commit(){
-
-}
-
-void Channel::rollback(){
-
-}
-
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
- Locker locker(deliveryLock);
-
- u_int64_t myDeliveryTag = deliveryTag++;
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
- }
- //send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
-}
-
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
- return countOk && sizeOk;
-}
-
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
-
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
- blocked = true;
- }else{
- blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }
- }
- return false;
-}
-
-void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
-}
-
-void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
-}
-
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
-}
-
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
- }
- message = Message::shared_ptr(msg);
-}
-
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
- }else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
- }
- unacknowledged.erase(i);
- }
-
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
-}
-
-void Channel::recover(bool requeue){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
- }else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
- }
-}
-
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
- Message::shared_ptr msg = queue->dequeue();
- if(msg){
- Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
-}