diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 591 |
1 files changed, 591 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp new file mode 100644 index 0000000000..059f99077c --- /dev/null +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -0,0 +1,591 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionState.h" +#include "BrokerAdapter.h" +#include "BrokerQueue.h" +#include "Connection.h" +#include "DeliverableMessage.h" +#include "DtxAck.h" +#include "DtxTimeout.h" +#include "Message.h" +#include "SemanticHandler.h" +#include "SessionHandler.h" +#include "TxAck.h" +#include "TxPublish.h" +#include "qpid/QpidError.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" + +#include <boost/bind.hpp> +#include <boost/format.hpp> + +#include <iostream> +#include <sstream> +#include <algorithm> +#include <functional> + +#include <assert.h> + + +namespace qpid { +namespace broker { + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) + : session(ss), + deliveryAdapter(da), + prefetchSize(0), + prefetchCount(0), + tagGenerator("sgen"), + dtxSelected(false), + accumulatedAck(0), + flowActive(true) +{ + outstanding.reset(); +} + +SemanticState::~SemanticState() { + consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); +} + +bool SemanticState::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, + Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, + bool exclusive, const FieldTable*) +{ + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); +} + +void SemanticState::cancel(const string& tag){ + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); +} + + +void SemanticState::startTx() +{ + txBuffer = TxBuffer::shared_ptr(new TxBuffer()); +} + +void SemanticState::commit(MessageStore* const store) +{ + if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + txBuffer->enlist(txAck); + if (txBuffer->commitLocal(store)) { + accumulatedAck.clear(); + } +} + +void SemanticState::rollback() +{ + if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + + txBuffer->rollback(); + accumulatedAck.clear(); +} + +void SemanticState::selectDtx() +{ + dtxSelected = true; +} + +void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ + if (!dtxSelected) { + throw ConnectionException(503, "Session has not been selected for use with dtx"); + } + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } +} + +void SemanticState::endDtx(const std::string& xid, bool fail) +{ + if (!dtxBuffer) { + throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); + } + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") + % dtxBuffer->getXid() % xid); + } + + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + if (fail) { + dtxBuffer->fail(); + } else { + dtxBuffer->markEnded(); + } + dtxBuffer.reset(); +} + +void SemanticState::suspendDtx(const std::string& xid) +{ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") + % dtxBuffer->getXid() % xid); + } + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + dtxBuffer->setSuspended(true); +} + +void SemanticState::resumeDtx(const std::string& xid) +{ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") + % dtxBuffer->getXid() % xid); + } + if (!dtxBuffer->isSuspended()) { + throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + } + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); +} + +void SemanticState::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + +void SemanticState::record(const DeliveryRecord& delivery) +{ + unacked.push_back(delivery); + delivery.addTo(outstanding); +} + +bool SemanticState::checkPrefetch(Message::shared_ptr& msg) +{ + Mutex::ScopedLock locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); + return countOk && sizeOk; +} + +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + DeliveryToken::shared_ptr _token, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _nolocal, + bool _acquire + ) : + Consumer(_acquire), + parent(_parent), + token(_token), + name(_name), + queue(_queue), + ackExpected(ack), + nolocal(_nolocal), + acquire(_acquire), + blocked(false), + windowing(true), + msgCredit(0), + byteCredit(0) {} + +bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) +{ + if (nolocal && + &parent->getSession().getConnection() == msg.payload->getPublisher()) { + return false; + } else { + if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { + blocked = true; + } else { + blocked = false; + + Mutex::ScopedLock locker(parent->deliveryLock); + + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg.payload, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); + } + } + return !blocked; + } +} + +bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) +{ + Mutex::ScopedLock l(lock); + if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { + return false; + } else { + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); + } + return true; + } +} + +void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + parent->deliveryAdapter.redeliver(msg, token, deliveryTag); +} + +SemanticState::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + +void SemanticState::ConsumerImpl::cancel() +{ + if(queue) { + queue->cancel(this); + if (queue->canAutoDelete()) { + parent->getSession().getBroker().getQueues().destroyIf( + queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + } + } +} + +void SemanticState::ConsumerImpl::requestDispatch() +{ + if(blocked) + queue->requestDispatch(this); +} + +void SemanticState::handle(Message::shared_ptr msg) { + if (txBuffer.get()) { + TxPublish* deliverable(new TxPublish(msg)); + TxOp::shared_ptr op(deliverable); + route(msg, *deliverable); + txBuffer->enlist(op); + } else { + DeliverableMessage deliverable(msg); + route(msg, deliverable); + } +} + +void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) { + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName){ + cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); + } + + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + + if (!strategy.delivered) { + //TODO:if reject-unroutable, then reject + //else route to alternate exchange + if (cacheExchange->getAlternate()) { + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + } + } + +} + +void SemanticState::ackCumulative(DeliveryId id) +{ + ack(id, id, true); +} + +void SemanticState::ackRange(DeliveryId first, DeliveryId last) +{ + ack(first, last, false); +} + +void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; + } + + for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } + + //if the prefetch limit had previously been reached, or credit + //had expired in windowing mode there may be messages that can + //be now be delivered + for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); +} + +void SemanticState::acknowledged(const DeliveryRecord& delivery) +{ + delivery.subtractFrom(outstanding); + ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); + if (i != consumers.end()) { + i->acknowledged(delivery); + } +} + +void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +{ + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + } +} + +void SemanticState::recover(bool requeue) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + if(requeue){ + outstanding.reset(); + //take copy and clear unacked as requeue may result in redelivery to this session + //which will in turn result in additions to unacked + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + } +} + +bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +{ + QueuedMessage msg = queue->dequeue(); + if(msg.payload){ + Mutex::ScopedLock locker(deliveryLock); + DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token); + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + +void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag, + DeliveryId deliveryTag) +{ + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); + } +} + +void SemanticState::flow(bool active) +{ + Mutex::ScopedLock locker(deliveryLock); + bool requestDelivery(!flowActive && active); + flowActive = active; + if (requestDelivery) { + //there may be messages that can be now be delivered + std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + } +} + + +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +{ + ConsumerImplMap::iterator i = consumers.find(destination); + if (i == consumers.end()) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + } else { + return *i; + } +} + +void SemanticState::setWindowMode(const std::string& destination) +{ + find(destination).setWindowMode(); +} + +void SemanticState::setCreditMode(const std::string& destination) +{ + find(destination).setCreditMode(); +} + +void SemanticState::addByteCredit(const std::string& destination, uint32_t value) +{ + find(destination).addByteCredit(value); +} + + +void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) +{ + find(destination).addMessageCredit(value); +} + +void SemanticState::flush(const std::string& destination) +{ + ConsumerImpl& c = find(destination); + c.flush(); +} + + +void SemanticState::stop(const std::string& destination) +{ + find(destination).stop(); +} + +void SemanticState::ConsumerImpl::setWindowMode() +{ + windowing = true; +} + +void SemanticState::ConsumerImpl::setCreditMode() +{ + windowing = false; +} + +void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +{ + byteCredit += value; + requestDispatch(); +} + +void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +{ + msgCredit += value; + requestDispatch(); +} + +void SemanticState::ConsumerImpl::flush() +{ + //need to prevent delivery after requestDispatch returns but + //before credit is reduced to zero; TODO: come up with better + //implementation of flush. + Mutex::ScopedLock l(lock); + queue->requestDispatch(this, true); + byteCredit = 0; + msgCredit = 0; +} + +void SemanticState::ConsumerImpl::stop() +{ + msgCredit = 0; + byteCredit = 0; +} + +Queue::shared_ptr SemanticState::getQueue(const string& name) const { + //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10 + Queue::shared_ptr queue; + if (name.empty()) { + queue = getDefaultQueue(); + if (!queue) + throw NotAllowedException(QPID_MSG("No queue name specified.")); + } + else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return queue; +} + +AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) +{ + ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (start != unacked.end()) { + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } + } + return AckRange(start, end); +} + +void SemanticState::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, AcquireFunctor(acquired)); +} + +void SemanticState::release(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); +} + +void SemanticState::reject(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); + //need to remove the delivery records as well + unacked.erase(range.start, range.end); +} + +}} // namespace qpid::broker |