diff options
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 346 |
1 files changed, 0 insertions, 346 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp deleted file mode 100644 index 5897914f26..0000000000 --- a/cpp/lib/broker/BrokerChannel.cpp +++ /dev/null @@ -1,346 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <assert.h> - -#include <iostream> -#include <sstream> -#include <algorithm> -#include <functional> - -#include <boost/bind.hpp> - -#include "BrokerChannel.h" -#include "DeletingTxOp.h" -#include "framing/ChannelAdapter.h" -#include <QpidError.h> -#include <DeliverableMessage.h> -#include <BrokerQueue.h> -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <TxAck.h> -#include <TxPublish.h> -#include "BrokerAdapter.h" -#include "Connection.h" - -using std::mem_fun_ref; -using std::bind2nd; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - - -Channel::Channel( - Connection& con, ChannelId id, - uint32_t _framesize, MessageStore* const _store, - uint64_t _stagingThreshold -) : - ChannelAdapter(id, &con.getOutput(), con.getVersion()), - connection(con), - currentDeliveryTag(1), - transactional(false), - prefetchSize(0), - prefetchCount(0), - framesize(_framesize), - tagGenerator("sgen"), - accumulatedAck(0), - store(_store), - messageBuilder(this, _store, _stagingThreshold), - opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - adapter(new BrokerAdapter(*this, con, con.broker)) -{ - outstanding.reset(); -} - -Channel::~Channel(){ - close(); -} - -bool Channel::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -// TODO aconway 2007-02-12: Why is connection token passed in instead -// of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection, - const FieldTable*) -{ - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); -} - -void Channel::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. - ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - consumers.erase(i); -} - -void Channel::close(){ - opened = false; - consumers.clear(); - recover(true); -} - -void Channel::begin(){ - transactional = true; -} - -void Channel::commit(){ - TxAck txAck(accumulatedAck, unacked); - txBuffer.enlist(&txAck); - if(txBuffer.prepare(store)){ - txBuffer.commit(); - } - accumulatedAck.clear(); -} - -void Channel::rollback(){ - txBuffer.rollback(); - accumulatedAck.clear(); -} - -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) -{ - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, framesize); -} - -bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacked.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); - return countOk && sizeOk; -} - -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} - -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ - blocked = true; - }else{ - blocked = false; - parent->deliver(msg, tag, queue, ackExpected); - return true; - } - } - return false; -} - -Channel::ConsumerImpl::~ConsumerImpl() { - cancel(); -} - -void Channel::ConsumerImpl::cancel(){ - if(queue) - queue->cancel(this); -} - -void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) - queue->dispatch(); -} - -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - if(transactional){ - TxPublish* deliverable = new TxPublish(msg); - exchange->route( - *deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable)); - }else{ - DeliverableMessage deliverable(msg); - exchange->route( - deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } -} - -void Channel::handlePublish(Message* _message){ - Message::shared_ptr message(_message); - messageBuilder.initialise(message); -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ - messageBuilder.setHeader(header); - //at this point, decide based on the size of the message whether we want - //to stage it by saving content directly to disk as it arrives -} - -void Channel::handleContent(AMQContentBody::shared_ptr content){ - messageBuilder.addContent(content); -} - -void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { - // TODO aconway 2007-01-17: Implement heartbeating. -} - -void Channel::complete(Message::shared_ptr msg) { - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - assert(exchange.get()); - if(transactional) { - std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); - exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable.release())); - } else { - DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } -} - -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - -// Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple){ - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); -} - -void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if(transactional){ - accumulatedAck.update(firstTag, lastTag); - - //TODO: I think the outstanding prefetch size & count should be updated at this point... - //TODO: ...this may then necessitate dispatching to consumers - }else{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); - - if(i == unacked.end()){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - }else if(i!=j){ - ack_iterator end = ++i; - for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } - - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), - boost::bind(&ConsumerImpl::requestDispatch, _1)); - } -} - -void Channel::recover(bool requeue){ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - std::list<DeliveryRecord> copy = unacked; - unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); - } -} - -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ - Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - framesize); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, - uint64_t deliveryTag) -{ - msg->deliver(*this, consumerTag, deliveryTag, framesize); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); - } -} |