diff options
author | Alan Conway <aconway@apache.org> | 2007-08-31 20:52:56 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-31 20:52:56 +0000 |
commit | 280badf2e64489a5214bdb2e8c5f7befa1de0e6b (patch) | |
tree | 1fc07491d16132f4c18e394d9c5a6025eb9682a1 /cpp/src | |
parent | 761e10501fe5ea51f9d8c40d9a200ae27193ab23 (diff) | |
download | qpid-python-280badf2e64489a5214bdb2e8c5f7befa1de0e6b.tar.gz |
Removed BrokerChannel.cpp, .h: replaced by Session.cpp, .h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571577 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 524 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 181 |
2 files changed, 0 insertions, 705 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp deleted file mode 100644 index ceecdf3040..0000000000 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ /dev/null @@ -1,524 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <assert.h> - -#include <iostream> -#include <sstream> -#include <algorithm> -#include <functional> - -#include <boost/bind.hpp> -#include <boost/format.hpp> - -#include "qpid/QpidError.h" - -#include "BrokerAdapter.h" -#include "BrokerChannel.h" -#include "BrokerQueue.h" -#include "Connection.h" -#include "DeliverableMessage.h" -#include "DtxAck.h" -#include "DtxTimeout.h" -#include "Message.h" -#include "TxAck.h" -#include "TxPublish.h" - -using std::mem_fun_ref; -using std::bind2nd; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - - -Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) : - id(_id), - connection(con), - out(_out), - prefetchSize(0), - prefetchCount(0), - tagGenerator("sgen"), - dtxSelected(false), - accumulatedAck(0), - opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - flowActive(true) -{ - outstanding.reset(); -} - -Channel::~Channel(){ - close(); -} - -bool Channel::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool acks, - bool exclusive, const FieldTable*) -{ - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); -} - -void Channel::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. - ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - consumers.erase(i); -} - -void Channel::close() -{ - opened = false; - consumers.clear(); - if (dtxBuffer.get()) { - dtxBuffer->fail(); - } - recover(true); -} - -void Channel::startTx() -{ - txBuffer = TxBuffer::shared_ptr(new TxBuffer()); -} - -void Channel::commit(MessageStore* const store) -{ - if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); - - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); - txBuffer->enlist(txAck); - if (txBuffer->commitLocal(store)) { - accumulatedAck.clear(); - } -} - -void Channel::rollback() -{ - if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); - - txBuffer->rollback(); - accumulatedAck.clear(); -} - -void Channel::selectDtx() -{ - dtxSelected = true; -} - -void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join) -{ - if (!dtxSelected) { - throw ConnectionException(503, "Channel has not been selected for use with dtx"); - } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); - if (join) { - mgr.join(xid, dtxBuffer); - } else { - mgr.start(xid, dtxBuffer); - } -} - -void Channel::endDtx(const std::string& xid, bool fail) -{ - if (!dtxBuffer) { - throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); - } - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") - % dtxBuffer->getXid() % xid); - } - - txBuffer.reset();//ops on this channel no longer transactional - - checkDtxTimeout(); - if (fail) { - dtxBuffer->fail(); - } else { - dtxBuffer->markEnded(); - } - dtxBuffer.reset(); -} - -void Channel::suspendDtx(const std::string& xid) -{ - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") - % dtxBuffer->getXid() % xid); - } - txBuffer.reset();//ops on this channel no longer transactional - - checkDtxTimeout(); - dtxBuffer->setSuspended(true); -} - -void Channel::resumeDtx(const std::string& xid) -{ - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") - % dtxBuffer->getXid() % xid); - } - if (!dtxBuffer->isSuspended()) { - throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); - } - - checkDtxTimeout(); - dtxBuffer->setSuspended(false); - txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); -} - -void Channel::checkDtxTimeout() -{ - if (dtxBuffer->isExpired()) { - dtxBuffer.reset(); - throw DtxTimeoutException(); - } -} - -void Channel::record(const DeliveryRecord& delivery) -{ - unacked.push_back(delivery); - delivery.addTo(outstanding); -} - -bool Channel::checkPrefetch(Message::shared_ptr& msg) -{ - Mutex::ScopedLock locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacked.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); - return countOk && sizeOk; -} - -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, - DeliveryToken::shared_ptr _token, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _nolocal, - bool _acquire - ) : parent(_parent), - token(_token), - name(_name), - queue(_queue), - ackExpected(ack), - nolocal(_nolocal), - acquire(_acquire), - blocked(false), - windowing(true), - msgCredit(0xFFFFFFFF), - byteCredit(0xFFFFFFFF) {} - -bool Channel::ConsumerImpl::deliver(QueuedMessage& msg) -{ - if (nolocal && &(parent->connection) == msg.payload->getPublisher()) { - return false; - } else { - if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { - blocked = true; - } else { - blocked = false; - - Mutex::ScopedLock locker(parent->deliveryLock); - - DeliveryId deliveryTag = parent->out.deliver(msg.payload, token); - if (ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); - } - } - return !blocked; - } -} - -bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg) -{ - Mutex::ScopedLock l(lock); - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - return false; - } else { - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } - return true; - } -} - -void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { - Mutex::ScopedLock locker(parent->deliveryLock); - parent->out.redeliver(msg, token, deliveryTag); -} - -Channel::ConsumerImpl::~ConsumerImpl() { - cancel(); -} - -void Channel::ConsumerImpl::cancel() -{ - if(queue) { - queue->cancel(this); - if (queue->canAutoDelete()) { - parent->connection.broker.getQueues().destroyIf(queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); - } - } -} - -void Channel::ConsumerImpl::requestDispatch() -{ - if(blocked) - queue->requestDispatch(); -} - -void Channel::handle(Message::shared_ptr msg) { - if (txBuffer.get()) { - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - route(msg, *deliverable); - txBuffer->enlist(op); - } else { - DeliverableMessage deliverable(msg); - route(msg, deliverable); - } -} - -void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { - std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = connection.broker.getExchanges().get(exchangeName); - } - - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); - - if (!strategy.delivered) { - //TODO:if reject-unroutable, then reject - //else route to alternate exchange - if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); - } - } - -} - -void Channel::ackCumulative(DeliveryId id) -{ - ack(id, id, true); -} - -void Channel::ackRange(DeliveryId first, DeliveryId last) -{ - ack(first, last, false); -} - -void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative) -{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); - } else { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&Channel::acknowledged, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); - - if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - } - } else { - for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(start, end); - } - - //if the prefetch limit had previously been reached, or credit - //had expired in windowing mode there may be messages that can - //be now be delivered - for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); -} - -void Channel::acknowledged(const DeliveryRecord& delivery) -{ - delivery.subtractFrom(outstanding); - ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); - if (i != consumers.end()) { - i->acknowledged(delivery); - } -} - -void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) -{ - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); - } -} - -void Channel::recover(bool requeue) -{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - //take copy and clear unacked as requeue may result in redelivery to this channel - //which will in turn result in additions to unacked - std::list<DeliveryRecord> copy = unacked; - unacked.clear(); - for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); - } -} - -bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) -{ - QueuedMessage msg = queue->dequeue(); - if(msg.payload){ - Mutex::ScopedLock locker(deliveryLock); - DeliveryId myDeliveryTag = out.deliver(msg.payload, token); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, - DeliveryId deliveryTag) -{ - ConsumerImplMap::iterator i = consumers.find(consumerTag); - if (i != consumers.end()){ - i->redeliver(msg, deliveryTag); - } -} - -void Channel::flow(bool active) -{ - Mutex::ScopedLock locker(deliveryLock); - bool requestDelivery(!flowActive && active); - flowActive = active; - if (requestDelivery) { - //there may be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); - } -} - - -Channel::ConsumerImpl& Channel::find(const std::string& destination) -{ - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw ChannelException(404, boost::format("Unknown destination %1%") % destination); - } else { - return *i; - } -} - -void Channel::setWindowMode(const std::string& destination) -{ - find(destination).setWindowMode(); -} - -void Channel::setCreditMode(const std::string& destination) -{ - find(destination).setCreditMode(); -} - -void Channel::addByteCredit(const std::string& destination, uint32_t value) -{ - find(destination).addByteCredit(value); -} - - -void Channel::addMessageCredit(const std::string& destination, uint32_t value) -{ - find(destination).addMessageCredit(value); -} - -void Channel::flush(const std::string& destination) -{ - ConsumerImpl& c = find(destination); - c.flush(); -} - - -void Channel::stop(const std::string& destination) -{ - find(destination).stop(); -} - -void Channel::ConsumerImpl::setWindowMode() -{ - windowing = true; -} - -void Channel::ConsumerImpl::setCreditMode() -{ - windowing = false; -} - -void Channel::ConsumerImpl::addByteCredit(uint32_t value) -{ - byteCredit += value; - requestDispatch(); -} - -void Channel::ConsumerImpl::addMessageCredit(uint32_t value) -{ - msgCredit += value; - requestDispatch(); -} - -void Channel::ConsumerImpl::flush() -{ - //TODO: need to wait until any messages that are available for - //this consumer have been delivered... i.e. some sort of flush on - //the queue... -} - -void Channel::ConsumerImpl::stop() -{ - msgCredit = 0; - byteCredit = 0; -} diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h deleted file mode 100644 index 98ee073d3d..0000000000 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ /dev/null @@ -1,181 +0,0 @@ -#ifndef _broker_BrokerChannel_h -#define _broker_BrokerChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <list> -#include <memory> - -#include <boost/scoped_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/ptr_container/ptr_map.hpp> - -#include "AccumulatedAck.h" -#include "Consumer.h" -#include "DeliveryAdapter.h" -#include "DeliveryRecord.h" -#include "DeliveryToken.h" -#include "Deliverable.h" -#include "DtxBuffer.h" -#include "DtxManager.h" -#include "NameGenerator.h" -#include "Prefetch.h" -#include "TxBuffer.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/ChannelOpenBody.h" - -namespace qpid { -namespace broker { - -class ConnectionToken; -class Connection; -class Queue; -class BrokerAdapter; - -using framing::string; - -/** - * Maintains state for an AMQP channel. Handles incoming and - * outgoing messages for that channel. - */ -class Channel -{ - class ConsumerImpl : public Consumer - { - sys::Mutex lock; - Channel* const parent; - const DeliveryToken::shared_ptr token; - const string name; - const Queue::shared_ptr queue; - const bool ackExpected; - const bool nolocal; - const bool acquire; - bool blocked; - bool windowing; - uint32_t msgCredit; - uint32_t byteCredit; - - bool checkCredit(Message::shared_ptr& msg); - - public: - ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, - const string& name, Queue::shared_ptr queue, - bool ack, bool nolocal, bool acquire = true); - ~ConsumerImpl(); - bool deliver(QueuedMessage& msg); - void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); - void cancel(); - void requestDispatch(); - - void setWindowMode(); - void setCreditMode(); - void addByteCredit(uint32_t value); - void addMessageCredit(uint32_t value); - void flush(); - void stop(); - void acknowledged(const DeliveryRecord&); - }; - - typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; - - framing::ChannelId id; - Connection& connection; - DeliveryAdapter& out; - Queue::shared_ptr defaultQueue; - ConsumerImplMap consumers; - uint32_t prefetchSize; - uint16_t prefetchCount; - Prefetch outstanding; - NameGenerator tagGenerator; - std::list<DeliveryRecord> unacked; - sys::Mutex deliveryLock; - TxBuffer::shared_ptr txBuffer; - DtxBuffer::shared_ptr dtxBuffer; - bool dtxSelected; - AccumulatedAck accumulatedAck; - bool opened; - bool flowActive; - - boost::shared_ptr<Exchange> cacheExchange; - - void route(Message::shared_ptr msg, Deliverable& strategy); - void record(const DeliveryRecord& delivery); - bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); - ConsumerImpl& find(const std::string& destination); - void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void acknowledged(const DeliveryRecord&); - - - public: - Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id); - ~Channel(); - - bool isOpen() const { return opened; } - framing::ChannelId getId() const { return id; } - - void open() { opened = true; } - void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } - Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } - uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; } - uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; } - - bool exists(const string& consumerTag); - - /** - *@param tagInOut - if empty it is updated with the generated token. - */ - void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, - bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0); - void cancel(const string& tag); - - void setWindowMode(const std::string& destination); - void setCreditMode(const std::string& destination); - void addByteCredit(const std::string& destination, uint32_t value); - void addMessageCredit(const std::string& destination, uint32_t value); - void flush(const std::string& destination); - void stop(const std::string& destination); - - bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); - void close(); - void startTx(); - void commit(MessageStore* const store); - void rollback(); - void selectDtx(); - void startDtx(const std::string& xid, DtxManager& mgr, bool join); - void endDtx(const std::string& xid, bool fail); - void suspendDtx(const std::string& xid); - void resumeDtx(const std::string& xid); - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); - void recover(bool requeue); - void flow(bool active); - void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); - - void handle(Message::shared_ptr msg); -}; - -}} // namespace broker - - -#endif /*!_broker_BrokerChannel_h*/ |