summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-31 20:52:56 +0000
committerAlan Conway <aconway@apache.org>2007-08-31 20:52:56 +0000
commit280badf2e64489a5214bdb2e8c5f7befa1de0e6b (patch)
tree1fc07491d16132f4c18e394d9c5a6025eb9682a1 /cpp/src
parent761e10501fe5ea51f9d8c40d9a200ae27193ab23 (diff)
downloadqpid-python-280badf2e64489a5214bdb2e8c5f7befa1de0e6b.tar.gz
Removed BrokerChannel.cpp, .h: replaced by Session.cpp, .h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571577 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp524
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h181
2 files changed, 0 insertions, 705 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
deleted file mode 100644
index ceecdf3040..0000000000
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <assert.h>
-
-#include <iostream>
-#include <sstream>
-#include <algorithm>
-#include <functional>
-
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-#include "qpid/QpidError.h"
-
-#include "BrokerAdapter.h"
-#include "BrokerChannel.h"
-#include "BrokerQueue.h"
-#include "Connection.h"
-#include "DeliverableMessage.h"
-#include "DtxAck.h"
-#include "DtxTimeout.h"
-#include "Message.h"
-#include "TxAck.h"
-#include "TxPublish.h"
-
-using std::mem_fun_ref;
-using std::bind2nd;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-
-Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) :
- id(_id),
- connection(con),
- out(_out),
- prefetchSize(0),
- prefetchCount(0),
- tagGenerator("sgen"),
- dtxSelected(false),
- accumulatedAck(0),
- opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
- flowActive(true)
-{
- outstanding.reset();
-}
-
-Channel::~Channel(){
- close();
-}
-
-bool Channel::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks,
- bool exclusive, const FieldTable*)
-{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
- queue->consume(c.get(), exclusive);//may throw exception
- consumers.insert(tagInOut, c.release());
-}
-
-void Channel::cancel(const string& tag){
- // consumers is a ptr_map so erase will delete the consumer
- // which will call cancel.
- ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- consumers.erase(i);
-}
-
-void Channel::close()
-{
- opened = false;
- consumers.clear();
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
- }
- recover(true);
-}
-
-void Channel::startTx()
-{
- txBuffer = TxBuffer::shared_ptr(new TxBuffer());
-}
-
-void Channel::commit(MessageStore* const store)
-{
- if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
-
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
- txBuffer->enlist(txAck);
- if (txBuffer->commitLocal(store)) {
- accumulatedAck.clear();
- }
-}
-
-void Channel::rollback()
-{
- if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
-
- txBuffer->rollback();
- accumulatedAck.clear();
-}
-
-void Channel::selectDtx()
-{
- dtxSelected = true;
-}
-
-void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join)
-{
- if (!dtxSelected) {
- throw ConnectionException(503, "Channel has not been selected for use with dtx");
- }
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
- if (join) {
- mgr.join(xid, dtxBuffer);
- } else {
- mgr.start(xid, dtxBuffer);
- }
-}
-
-void Channel::endDtx(const std::string& xid, bool fail)
-{
- if (!dtxBuffer) {
- throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
- }
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
- % dtxBuffer->getXid() % xid);
- }
-
- txBuffer.reset();//ops on this channel no longer transactional
-
- checkDtxTimeout();
- if (fail) {
- dtxBuffer->fail();
- } else {
- dtxBuffer->markEnded();
- }
- dtxBuffer.reset();
-}
-
-void Channel::suspendDtx(const std::string& xid)
-{
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
- % dtxBuffer->getXid() % xid);
- }
- txBuffer.reset();//ops on this channel no longer transactional
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(true);
-}
-
-void Channel::resumeDtx(const std::string& xid)
-{
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
- % dtxBuffer->getXid() % xid);
- }
- if (!dtxBuffer->isSuspended()) {
- throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
- }
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(false);
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
-}
-
-void Channel::checkDtxTimeout()
-{
- if (dtxBuffer->isExpired()) {
- dtxBuffer.reset();
- throw DtxTimeoutException();
- }
-}
-
-void Channel::record(const DeliveryRecord& delivery)
-{
- unacked.push_back(delivery);
- delivery.addTo(outstanding);
-}
-
-bool Channel::checkPrefetch(Message::shared_ptr& msg)
-{
- Mutex::ScopedLock locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacked.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
- return countOk && sizeOk;
-}
-
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent,
- DeliveryToken::shared_ptr _token,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _nolocal,
- bool _acquire
- ) : parent(_parent),
- token(_token),
- name(_name),
- queue(_queue),
- ackExpected(ack),
- nolocal(_nolocal),
- acquire(_acquire),
- blocked(false),
- windowing(true),
- msgCredit(0xFFFFFFFF),
- byteCredit(0xFFFFFFFF) {}
-
-bool Channel::ConsumerImpl::deliver(QueuedMessage& msg)
-{
- if (nolocal && &(parent->connection) == msg.payload->getPublisher()) {
- return false;
- } else {
- if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
- blocked = true;
- } else {
- blocked = false;
-
- Mutex::ScopedLock locker(parent->deliveryLock);
-
- DeliveryId deliveryTag = parent->out.deliver(msg.payload, token);
- if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
- }
- }
- return !blocked;
- }
-}
-
-bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
-{
- Mutex::ScopedLock l(lock);
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- return false;
- } else {
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
- return true;
- }
-}
-
-void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
- Mutex::ScopedLock locker(parent->deliveryLock);
- parent->out.redeliver(msg, token, deliveryTag);
-}
-
-Channel::ConsumerImpl::~ConsumerImpl() {
- cancel();
-}
-
-void Channel::ConsumerImpl::cancel()
-{
- if(queue) {
- queue->cancel(this);
- if (queue->canAutoDelete()) {
- parent->connection.broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
- }
- }
-}
-
-void Channel::ConsumerImpl::requestDispatch()
-{
- if(blocked)
- queue->requestDispatch();
-}
-
-void Channel::handle(Message::shared_ptr msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- }
-}
-
-void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
- std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = connection.broker.getExchanges().get(exchangeName);
- }
-
- cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
-
- if (!strategy.delivered) {
- //TODO:if reject-unroutable, then reject
- //else route to alternate exchange
- if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
- }
- }
-
-}
-
-void Channel::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void Channel::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(start, end);
- }
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
-}
-
-void Channel::acknowledged(const DeliveryRecord& delivery)
-{
- delivery.subtractFrom(outstanding);
- ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
- if (i != consumers.end()) {
- i->acknowledged(delivery);
- }
-}
-
-void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
-{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
- }
-}
-
-void Channel::recover(bool requeue)
-{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstanding.reset();
- //take copy and clear unacked as requeue may result in redelivery to this channel
- //which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
- }
-}
-
-bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
-{
- QueuedMessage msg = queue->dequeue();
- if(msg.payload){
- Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = out.deliver(msg.payload, token);
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
- DeliveryId deliveryTag)
-{
- ConsumerImplMap::iterator i = consumers.find(consumerTag);
- if (i != consumers.end()){
- i->redeliver(msg, deliveryTag);
- }
-}
-
-void Channel::flow(bool active)
-{
- Mutex::ScopedLock locker(deliveryLock);
- bool requestDelivery(!flowActive && active);
- flowActive = active;
- if (requestDelivery) {
- //there may be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
- }
-}
-
-
-Channel::ConsumerImpl& Channel::find(const std::string& destination)
-{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw ChannelException(404, boost::format("Unknown destination %1%") % destination);
- } else {
- return *i;
- }
-}
-
-void Channel::setWindowMode(const std::string& destination)
-{
- find(destination).setWindowMode();
-}
-
-void Channel::setCreditMode(const std::string& destination)
-{
- find(destination).setCreditMode();
-}
-
-void Channel::addByteCredit(const std::string& destination, uint32_t value)
-{
- find(destination).addByteCredit(value);
-}
-
-
-void Channel::addMessageCredit(const std::string& destination, uint32_t value)
-{
- find(destination).addMessageCredit(value);
-}
-
-void Channel::flush(const std::string& destination)
-{
- ConsumerImpl& c = find(destination);
- c.flush();
-}
-
-
-void Channel::stop(const std::string& destination)
-{
- find(destination).stop();
-}
-
-void Channel::ConsumerImpl::setWindowMode()
-{
- windowing = true;
-}
-
-void Channel::ConsumerImpl::setCreditMode()
-{
- windowing = false;
-}
-
-void Channel::ConsumerImpl::addByteCredit(uint32_t value)
-{
- byteCredit += value;
- requestDispatch();
-}
-
-void Channel::ConsumerImpl::addMessageCredit(uint32_t value)
-{
- msgCredit += value;
- requestDispatch();
-}
-
-void Channel::ConsumerImpl::flush()
-{
- //TODO: need to wait until any messages that are available for
- //this consumer have been delivered... i.e. some sort of flush on
- //the queue...
-}
-
-void Channel::ConsumerImpl::stop()
-{
- msgCredit = 0;
- byteCredit = 0;
-}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
deleted file mode 100644
index 98ee073d3d..0000000000
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ /dev/null
@@ -1,181 +0,0 @@
-#ifndef _broker_BrokerChannel_h
-#define _broker_BrokerChannel_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <list>
-#include <memory>
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/ptr_container/ptr_map.hpp>
-
-#include "AccumulatedAck.h"
-#include "Consumer.h"
-#include "DeliveryAdapter.h"
-#include "DeliveryRecord.h"
-#include "DeliveryToken.h"
-#include "Deliverable.h"
-#include "DtxBuffer.h"
-#include "DtxManager.h"
-#include "NameGenerator.h"
-#include "Prefetch.h"
-#include "TxBuffer.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ChannelOpenBody.h"
-
-namespace qpid {
-namespace broker {
-
-class ConnectionToken;
-class Connection;
-class Queue;
-class BrokerAdapter;
-
-using framing::string;
-
-/**
- * Maintains state for an AMQP channel. Handles incoming and
- * outgoing messages for that channel.
- */
-class Channel
-{
- class ConsumerImpl : public Consumer
- {
- sys::Mutex lock;
- Channel* const parent;
- const DeliveryToken::shared_ptr token;
- const string name;
- const Queue::shared_ptr queue;
- const bool ackExpected;
- const bool nolocal;
- const bool acquire;
- bool blocked;
- bool windowing;
- uint32_t msgCredit;
- uint32_t byteCredit;
-
- bool checkCredit(Message::shared_ptr& msg);
-
- public:
- ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token,
- const string& name, Queue::shared_ptr queue,
- bool ack, bool nolocal, bool acquire = true);
- ~ConsumerImpl();
- bool deliver(QueuedMessage& msg);
- void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
- void cancel();
- void requestDispatch();
-
- void setWindowMode();
- void setCreditMode();
- void addByteCredit(uint32_t value);
- void addMessageCredit(uint32_t value);
- void flush();
- void stop();
- void acknowledged(const DeliveryRecord&);
- };
-
- typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
-
- framing::ChannelId id;
- Connection& connection;
- DeliveryAdapter& out;
- Queue::shared_ptr defaultQueue;
- ConsumerImplMap consumers;
- uint32_t prefetchSize;
- uint16_t prefetchCount;
- Prefetch outstanding;
- NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
- sys::Mutex deliveryLock;
- TxBuffer::shared_ptr txBuffer;
- DtxBuffer::shared_ptr dtxBuffer;
- bool dtxSelected;
- AccumulatedAck accumulatedAck;
- bool opened;
- bool flowActive;
-
- boost::shared_ptr<Exchange> cacheExchange;
-
- void route(Message::shared_ptr msg, Deliverable& strategy);
- void record(const DeliveryRecord& delivery);
- bool checkPrefetch(Message::shared_ptr& msg);
- void checkDtxTimeout();
- ConsumerImpl& find(const std::string& destination);
- void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void acknowledged(const DeliveryRecord&);
-
-
- public:
- Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id);
- ~Channel();
-
- bool isOpen() const { return opened; }
- framing::ChannelId getId() const { return id; }
-
- void open() { opened = true; }
- void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
- Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
- uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
- uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
-
- bool exists(const string& consumerTag);
-
- /**
- *@param tagInOut - if empty it is updated with the generated token.
- */
- void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
- bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
- void cancel(const string& tag);
-
- void setWindowMode(const std::string& destination);
- void setCreditMode(const std::string& destination);
- void addByteCredit(const std::string& destination, uint32_t value);
- void addMessageCredit(const std::string& destination, uint32_t value);
- void flush(const std::string& destination);
- void stop(const std::string& destination);
-
- bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
- void close();
- void startTx();
- void commit(MessageStore* const store);
- void rollback();
- void selectDtx();
- void startDtx(const std::string& xid, DtxManager& mgr, bool join);
- void endDtx(const std::string& xid, bool fail);
- void suspendDtx(const std::string& xid);
- void resumeDtx(const std::string& xid);
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
- void recover(bool requeue);
- void flow(bool active);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
-
- void handle(Message::shared_ptr msg);
-};
-
-}} // namespace broker
-
-
-#endif /*!_broker_BrokerChannel_h*/