summaryrefslogtreecommitdiff
path: root/cpp/src/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/broker/BrokerChannel.cpp346
1 files changed, 346 insertions, 0 deletions
diff --git a/cpp/src/broker/BrokerChannel.cpp b/cpp/src/broker/BrokerChannel.cpp
new file mode 100644
index 0000000000..5f35f36da6
--- /dev/null
+++ b/cpp/src/broker/BrokerChannel.cpp
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <assert.h>
+
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+
+#include <boost/bind.hpp>
+
+#include "BrokerChannel.h"
+#include "DeletingTxOp.h"
+#include "../framing/ChannelAdapter.h"
+#include "../QpidError.h"
+#include "DeliverableMessage.h"
+#include "BrokerQueue.h"
+#include "BrokerMessage.h"
+#include "MessageStore.h"
+#include "TxAck.h"
+#include "TxPublish.h"
+#include "BrokerAdapter.h"
+#include "Connection.h"
+
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+Channel::Channel(
+ Connection& con, ChannelId id,
+ uint32_t _framesize, MessageStore* const _store,
+ uint64_t _stagingThreshold
+) :
+ ChannelAdapter(id, &con.getOutput(), con.getVersion()),
+ connection(con),
+ currentDeliveryTag(1),
+ transactional(false),
+ prefetchSize(0),
+ prefetchCount(0),
+ framesize(_framesize),
+ tagGenerator("sgen"),
+ accumulatedAck(0),
+ store(_store),
+ messageBuilder(this, _store, _stagingThreshold),
+ opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
+ adapter(new BrokerAdapter(*this, con, con.broker))
+{
+ outstanding.reset();
+}
+
+Channel::~Channel(){
+ close();
+}
+
+bool Channel::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+// TODO aconway 2007-02-12: Why is connection token passed in instead
+// of using the channel's parent connection?
+void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection,
+ const FieldTable*)
+{
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(
+ new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
+}
+
+void Channel::cancel(const string& tag){
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
+}
+
+void Channel::close(){
+ opened = false;
+ consumers.clear();
+ recover(true);
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
+}
+
+void Channel::rollback(){
+ txBuffer.rollback();
+ accumulatedAck.clear();
+}
+
+void Channel::deliver(
+ Message::shared_ptr& msg, const string& consumerTag,
+ Queue::shared_ptr& queue, bool ackExpected)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+
+ // Key the delivered messages to the id of the request in which they're sent
+ uint64_t deliveryTag = getNextSendRequestId();
+
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
+ }
+ //send deliver method, header and content(s)
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(!connection || connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
+ }
+ return false;
+}
+
+Channel::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue)
+ queue->cancel(this);
+}
+
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked)
+ queue->dispatch();
+}
+
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
+{
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(
+ *deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(
+ deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ }
+}
+
+void Channel::handlePublish(Message* _message){
+ Message::shared_ptr message(_message);
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+ //at this point, decide based on the size of the message whether we want
+ //to stage it by saving content directly to disk as it arrives
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
+}
+
+void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
+ // TODO aconway 2007-01-17: Implement heartbeating.
+}
+
+void Channel::complete(Message::shared_ptr msg) {
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ if(transactional) {
+ std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+ exchange->route(*deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+ } else {
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ }
+}
+
+void Channel::ack(){
+ ack(getFirstAckRequest(), getLastAckRequest());
+}
+
+// Used by Basic
+void Channel::ack(uint64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(uint64_t firstTag, uint64_t lastTag){
+ if(transactional){
+ accumulatedAck.update(firstTag, lastTag);
+
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
+ }else{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
+ if(i == unacked.end()){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }else if(i!=j){
+ ack_iterator end = ++i;
+ for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ std::for_each(consumers.begin(), consumers.end(),
+ boost::bind(&ConsumerImpl::requestDispatch, _1));
+ }
+}
+
+void Channel::recover(bool requeue){
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ uint64_t myDeliveryTag = getNextSendRequestId();
+ msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ destination,
+ queue->getMessageCount() + 1, myDeliveryTag,
+ framesize);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ uint64_t deliveryTag)
+{
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+void Channel::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ } else {
+ method->invoke(*adapter, context);
+ }
+ }catch(ChannelException& e){
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+}