summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp642
1 files changed, 0 insertions, 642 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
deleted file mode 100644
index 9b44f31e14..0000000000
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionContext.h"
-#include "BrokerAdapter.h"
-#include "Queue.h"
-#include "Connection.h"
-#include "DeliverableMessage.h"
-#include "DtxAck.h"
-#include "DtxTimeout.h"
-#include "Message.h"
-#include "SemanticHandler.h"
-#include "SessionHandler.h"
-#include "TxAck.h"
-#include "TxPublish.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "qpid/ptr_map.h"
-
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-#include <iostream>
-#include <sstream>
-#include <algorithm>
-#include <functional>
-
-#include <assert.h>
-
-
-namespace qpid {
-namespace broker {
-
-using std::mem_fun_ref;
-using std::bind2nd;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid::ptr_map;
-
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
- : session(ss),
- deliveryAdapter(da),
- prefetchSize(0),
- prefetchCount(0),
- tagGenerator("sgen"),
- dtxSelected(false),
- accumulatedAck(0),
- flowActive(true),
- outputTasks(ss)
-{
- outstanding.reset();
-}
-
-SemanticState::~SemanticState() {
- //cancel all consumers
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(*get_pointer(i));
- }
-
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
- }
- recover(true);
-}
-
-bool SemanticState::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
- bool exclusive, const FieldTable*)
-{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(*c, exclusive);//may throw exception
- outputTasks.addOutputTask(c.get());
- consumers.insert(tagInOut, c.release());
-}
-
-void SemanticState::cancel(const string& tag){
- ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end()) {
- cancel(*get_pointer(i));
- consumers.erase(i);
- //should cancel all unacked messages for this consumer so that
- //they are not redelivered on recovery
- for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
-
- }
-}
-
-
-void SemanticState::startTx()
-{
- txBuffer = TxBuffer::shared_ptr(new TxBuffer());
-}
-
-void SemanticState::commit(MessageStore* const store)
-{
- if (!txBuffer) throw
- CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
- txBuffer->enlist(txAck);
- if (txBuffer->commitLocal(store)) {
- accumulatedAck.clear();
- }
-}
-
-void SemanticState::rollback()
-{
- if (!txBuffer)
- throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-
- txBuffer->rollback();
- accumulatedAck.clear();
-}
-
-void SemanticState::selectDtx()
-{
- dtxSelected = true;
-}
-
-void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
-{
- if (!dtxSelected) {
- throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
- }
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
- if (join) {
- mgr.join(xid, dtxBuffer);
- } else {
- mgr.start(xid, dtxBuffer);
- }
-}
-
-void SemanticState::endDtx(const std::string& xid, bool fail)
-{
- if (!dtxBuffer) {
- throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session"));
- }
- if (dtxBuffer->getXid() != xid) {
- throw CommandInvalidException(
- QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
-
- }
-
- txBuffer.reset();//ops on this session no longer transactional
-
- checkDtxTimeout();
- if (fail) {
- dtxBuffer->fail();
- } else {
- dtxBuffer->markEnded();
- }
- dtxBuffer.reset();
-}
-
-void SemanticState::suspendDtx(const std::string& xid)
-{
- if (dtxBuffer->getXid() != xid) {
- throw CommandInvalidException(
- QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
- }
- txBuffer.reset();//ops on this session no longer transactional
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(true);
- suspendedXids[xid] = dtxBuffer;
- dtxBuffer.reset();
-}
-
-void SemanticState::resumeDtx(const std::string& xid)
-{
- if (!dtxSelected) {
- throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
- }
-
- dtxBuffer = suspendedXids[xid];
- if (!dtxBuffer) {
- throw CommandInvalidException(QPID_MSG("xid " << xid << " not attached"));
- } else {
- suspendedXids.erase(xid);
- }
-
- if (dtxBuffer->getXid() != xid) {
- throw CommandInvalidException(
- QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
-
- }
- if (!dtxBuffer->isSuspended()) {
- throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
- }
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(false);
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
-}
-
-void SemanticState::checkDtxTimeout()
-{
- if (dtxBuffer->isExpired()) {
- dtxBuffer.reset();
- throw DtxTimeoutException();
- }
-}
-
-void SemanticState::record(const DeliveryRecord& delivery)
-{
- unacked.push_back(delivery);
- delivery.addTo(outstanding);
-}
-
-bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg)
-{
- bool countOk = !prefetchCount || prefetchCount > unacked.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
- return countOk && sizeOk;
-}
-
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- DeliveryToken::shared_ptr _token,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _nolocal,
- bool _acquire
- ) :
- Consumer(_acquire),
- parent(_parent),
- token(_token),
- name(_name),
- queue(_queue),
- ackExpected(ack),
- nolocal(_nolocal),
- acquire(_acquire),
- blocked(true),
- windowing(true),
- msgCredit(0),
- byteCredit(0) {}
-
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
-{
- allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
- return true;
-}
-
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
-{
- return !(nolocal &&
- &parent->getSession().getConnection() == msg->getPublisher());
-}
-
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
-{
- //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed):
- blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg)));
- return !blocked;
-}
-
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
-{
- uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
- << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
- << " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
-}
-
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
-{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
-}
-
-SemanticState::ConsumerImpl::~ConsumerImpl() {}
-
-void SemanticState::cancel(ConsumerImpl& c)
-{
- outputTasks.removeOutputTask(&c);
- Queue::shared_ptr queue = c.getQueue();
- if(queue) {
- queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(session.getBroker(), queue);
- }
- }
-}
-
-void SemanticState::handle(intrusive_ptr<Message> msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- }
-}
-
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- std::string exchangeName = msg->getExchangeName();
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
-
- cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
-
- if (!strategy.delivered) {
- //TODO:if reject-unroutable, then reject
- //else route to alternate exchange
- if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
- }
- }
-
-}
-
-void SemanticState::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- {
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else if (start != unacked.end()) {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(start, end);
- }
- }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- requestDispatch();
-}
-
-void SemanticState::requestDispatch()
-{
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*get_pointer(i));
- }
-}
-
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{
- if(c.isBlocked()) {
- c.doOutput();
- }
-}
-
-void SemanticState::acknowledged(const DeliveryRecord& delivery)
-{
- delivery.subtractFrom(outstanding);
- ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
- if (i != consumers.end()) {
- get_pointer(i)->acknowledged(delivery);
- }
-}
-
-void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
-{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
- }
-}
-
-void SemanticState::recover(bool requeue)
-{
- if(requeue){
- outstanding.reset();
- //take copy and clear unacked as requeue may result in redelivery to this session
- //which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
- //unconfirmed messages re redelivered and therefore have their
- //id adjusted, confirmed messages are not and so the ordering
- //w.r.t id is lost
- unacked.sort();
- }
-}
-
-bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
-{
- QueuedMessage msg = queue->dequeue();
- if(msg.payload){
- DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
-{
- return deliveryAdapter.deliver(msg, token);
-}
-
-void SemanticState::flow(bool active)
-{
- bool requestDelivery(!flowActive && active);
- flowActive = active;
- if (requestDelivery) {
- //there may be messages that can be now be delivered
- requestDispatch();
- }
-}
-
-
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
-{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
- } else {
- return *get_pointer(i);
- }
-}
-
-void SemanticState::setWindowMode(const std::string& destination)
-{
- find(destination).setWindowMode();
-}
-
-void SemanticState::setCreditMode(const std::string& destination)
-{
- find(destination).setCreditMode();
-}
-
-void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
-{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- requestDispatch(c);
-}
-
-
-void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
-{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- requestDispatch(c);
-}
-
-void SemanticState::flush(const std::string& destination)
-{
- find(destination).flush();
-}
-
-
-void SemanticState::stop(const std::string& destination)
-{
- find(destination).stop();
-}
-
-void SemanticState::ConsumerImpl::setWindowMode()
-{
- windowing = true;
-}
-
-void SemanticState::ConsumerImpl::setCreditMode()
-{
- windowing = false;
-}
-
-void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
-{
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit += value;
- }
-}
-
-void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
-{
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit += value;
- }
-}
-
-void SemanticState::ConsumerImpl::flush()
-{
- while(queue->dispatch(*this));
- stop();
-}
-
-void SemanticState::ConsumerImpl::stop()
-{
- msgCredit = 0;
- byteCredit = 0;
-}
-
-Queue::shared_ptr SemanticState::getQueue(const string& name) const {
- Queue::shared_ptr queue;
- if (name.empty()) {
- throw NotAllowedException(QPID_MSG("No queue name specified."));
- } else {
- queue = session.getBroker().getQueues().find(name);
- if (!queue)
- throw NotFoundException(QPID_MSG("Queue not found: "<<name));
- }
- return queue;
-}
-
-AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
- ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (start != unacked.end()) {
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- }
- }
- return AckRange(start, end);
-}
-
-void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
-{
- AckRange range = findRange(first, last);
- for_each(range.start, range.end, AcquireFunctor(acquired));
-}
-
-void SemanticState::release(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
- //release results in the message being added to the head so want
- //to release in reverse order to keep the original transfer order
- DeliveryRecords::reverse_iterator start(range.end);
- DeliveryRecords::reverse_iterator end(range.start);
- for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
-}
-
-void SemanticState::reject(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
- //need to remove the delivery records as well
- unacked.erase(range.start, range.end);
-}
-
-bool SemanticState::ConsumerImpl::doOutput()
-{
- //TODO: think through properly
- return queue->dispatch(*this);
-}
-
-void SemanticState::ConsumerImpl::notify()
-{
- //TODO: think through properly
- parent->outputTasks.activateOutput();
-}
-
-}} // namespace qpid::broker