diff options
Diffstat (limited to 'cpp/src/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/broker/BrokerChannel.cpp | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/cpp/src/broker/BrokerChannel.cpp b/cpp/src/broker/BrokerChannel.cpp new file mode 100644 index 0000000000..5f35f36da6 --- /dev/null +++ b/cpp/src/broker/BrokerChannel.cpp @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <assert.h> + +#include <iostream> +#include <sstream> +#include <algorithm> +#include <functional> + +#include <boost/bind.hpp> + +#include "BrokerChannel.h" +#include "DeletingTxOp.h" +#include "../framing/ChannelAdapter.h" +#include "../QpidError.h" +#include "DeliverableMessage.h" +#include "BrokerQueue.h" +#include "BrokerMessage.h" +#include "MessageStore.h" +#include "TxAck.h" +#include "TxPublish.h" +#include "BrokerAdapter.h" +#include "Connection.h" + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + + +Channel::Channel( + Connection& con, ChannelId id, + uint32_t _framesize, MessageStore* const _store, + uint64_t _stagingThreshold +) : + ChannelAdapter(id, &con.getOutput(), con.getVersion()), + connection(con), + currentDeliveryTag(1), + transactional(false), + prefetchSize(0), + prefetchCount(0), + framesize(_framesize), + tagGenerator("sgen"), + accumulatedAck(0), + store(_store), + messageBuilder(this, _store, _stagingThreshold), + opened(id == 0),//channel 0 is automatically open, other must be explicitly opened + adapter(new BrokerAdapter(*this, con, con.broker)) +{ + outstanding.reset(); +} + +Channel::~Channel(){ + close(); +} + +bool Channel::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, + bool exclusive, ConnectionToken* const connection, + const FieldTable*) +{ + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); +} + +void Channel::cancel(const string& tag){ + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); +} + +void Channel::close(){ + opened = false; + consumers.clear(); + recover(true); +} + +void Channel::begin(){ + transactional = true; +} + +void Channel::commit(){ + TxAck txAck(accumulatedAck, unacked); + txBuffer.enlist(&txAck); + if(txBuffer.prepare(store)){ + txBuffer.commit(); + } + accumulatedAck.clear(); +} + +void Channel::rollback(){ + txBuffer.rollback(); + accumulatedAck.clear(); +} + +void Channel::deliver( + Message::shared_ptr& msg, const string& consumerTag, + Queue::shared_ptr& queue, bool ackExpected) +{ + Mutex::ScopedLock locker(deliveryLock); + + // Key the delivered messages to the id of the request in which they're sent + uint64_t deliveryTag = getNextSendRequestId(); + + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; + } + //send deliver method, header and content(s) + msg->deliver(*this, consumerTag, deliveryTag, framesize); +} + +bool Channel::checkPrefetch(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); + return countOk && sizeOk; +} + +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} + +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ + if(!connection || connection != msg->getPublisher()){//check for no_local + if(ackExpected && !parent->checkPrefetch(msg)){ + blocked = true; + }else{ + blocked = false; + parent->deliver(msg, tag, queue, ackExpected); + return true; + } + } + return false; +} + +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + +void Channel::ConsumerImpl::cancel(){ + if(queue) + queue->cancel(this); +} + +void Channel::ConsumerImpl::requestDispatch(){ + if(blocked) + queue->dispatch(); +} + +void Channel::handleInlineTransfer(Message::shared_ptr msg) +{ + Exchange::shared_ptr exchange = + connection.broker.getExchanges().get(msg->getExchange()); + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); + } +} + +void Channel::handlePublish(Message* _message){ + Message::shared_ptr message(_message); + messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ + messageBuilder.setHeader(header); + //at this point, decide based on the size of the message whether we want + //to stage it by saving content directly to disk as it arrives +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ + messageBuilder.addContent(content); +} + +void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { + // TODO aconway 2007-01-17: Implement heartbeating. +} + +void Channel::complete(Message::shared_ptr msg) { + Exchange::shared_ptr exchange = + connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + if(transactional) { + std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); + exchange->route(*deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable.release())); + } else { + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); + } +} + +void Channel::ack(){ + ack(getFirstAckRequest(), getLastAckRequest()); +} + +// Used by Basic +void Channel::ack(uint64_t deliveryTag, bool multiple){ + if (multiple) + ack(0, deliveryTag); + else + ack(deliveryTag, deliveryTag); +} + +void Channel::ack(uint64_t firstTag, uint64_t lastTag){ + if(transactional){ + accumulatedAck.update(firstTag, lastTag); + + //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: ...this may then necessitate dispatching to consumers + }else{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); + ack_iterator j = (firstTag == 0) ? + unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); + + if(i == unacked.end()){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + }else if(i!=j){ + ack_iterator end = ++i; + for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); + unacked.erase(unacked.begin(), end); + + //recalculate the prefetch: + outstanding.reset(); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); + }else{ + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); + } + + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); + } +} + +void Channel::recover(bool requeue){ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + if(requeue){ + outstanding.reset(); + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + } +} + +bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Mutex::ScopedLock locker(deliveryLock); + uint64_t myDeliveryTag = getNextSendRequestId(); + msg->sendGetOk(MethodContext(this, msg->getRespondTo()), + destination, + queue->getMessageCount() + 1, myDeliveryTag, + framesize); + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, + uint64_t deliveryTag) +{ + msg->deliver(*this, consumerTag, deliveryTag, framesize); +} + +void Channel::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) +{ + try{ + if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { + std::stringstream out; + out << "Attempt to use unopened channel: " << getId(); + throw ConnectionException(504, out.str()); + } else { + method->invoke(*adapter, context); + } + }catch(ChannelException& e){ + adapter->getProxy().getChannel().close( + e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + } +} |