diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 823 |
1 files changed, 823 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp new file mode 100644 index 0000000000..ce86253f4a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -0,0 +1,823 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SessionState.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxTimeout.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionContext.h" +#include "qpid/broker/SessionOutputException.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/IsInSequenceSet.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/ClusterSafe.h" +#include "qpid/ptr_map.h" +#include "qpid/broker/AclModule.h" + +#include <boost/bind.hpp> +#include <boost/format.hpp> + +#include <iostream> +#include <sstream> +#include <algorithm> +#include <functional> + +#include <assert.h> + +namespace qpid { +namespace broker { + +using namespace std; +using boost::intrusive_ptr; +using boost::bind; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; +using qpid::ptr_map_ptr; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +namespace _qmf = qmf::org::apache::qpid::broker; + +SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) + : session(ss), + deliveryAdapter(da), + tagGenerator("sgen"), + dtxSelected(false), + authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), + userID(getSession().getConnection().getUserId()), + userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), + isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), + closeComplete(false) +{ + acl = getSession().getBroker().getAcl(); +} + +SemanticState::~SemanticState() { + closed(); +} + +void SemanticState::closed() { + if (!closeComplete) { + //prevent requeued messages being redelivered to consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + disable(i->second); + } + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); + + //now unsubscribe, which may trigger queue deletion and thus + //needs to occur after the requeueing of unacked messages + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + unsubscribe(i->second); + } + closeComplete = true; + } +} + +bool SemanticState::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void SemanticState::consume(const string& tag, + Queue::shared_ptr queue, bool ackRequired, bool acquire, + bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) +{ + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + queue->consume(c, exclusive);//may throw exception + consumers[tag] = c; +} + +bool SemanticState::cancel(const string& tag) +{ + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) { + cancel(i->second); + consumers.erase(i); + //should cancel all unacked messages for this consumer so that + //they are not redelivered on recovery + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); + return true; + } else { + return false; + } +} + + +void SemanticState::startTx() +{ + txBuffer = TxBuffer::shared_ptr(new TxBuffer()); +} + +void SemanticState::commit(MessageStore* const store) +{ + if (!txBuffer) throw + CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); + + TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); + txBuffer->enlist(txAck); + if (txBuffer->commitLocal(store)) { + accumulatedAck.clear(); + } else { + throw InternalErrorException(QPID_MSG("Commit failed")); + } +} + +void SemanticState::rollback() +{ + if (!txBuffer) + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); + + txBuffer->rollback(); + accumulatedAck.clear(); +} + +void SemanticState::selectDtx() +{ + dtxSelected = true; +} + +void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ + if (!dtxSelected) { + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); + } + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); + txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } +} + +void SemanticState::endDtx(const std::string& xid, bool fail) +{ + if (!dtxBuffer) { + throw IllegalStateException(QPID_MSG("xid " << xid << " not associated with this session")); + } + if (dtxBuffer->getXid() != xid) { + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end")); + + } + + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + if (fail) { + dtxBuffer->fail(); + } else { + dtxBuffer->markEnded(); + } + dtxBuffer.reset(); +} + +void SemanticState::suspendDtx(const std::string& xid) +{ + if (dtxBuffer->getXid() != xid) { + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend")); + } + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + dtxBuffer->setSuspended(true); + suspendedXids[xid] = dtxBuffer; + dtxBuffer.reset(); +} + +void SemanticState::resumeDtx(const std::string& xid) +{ + if (!dtxSelected) { + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); + } + + dtxBuffer = suspendedXids[xid]; + if (!dtxBuffer) { + throw CommandInvalidException(QPID_MSG("xid " << xid << " not attached")); + } else { + suspendedXids.erase(xid); + } + + if (dtxBuffer->getXid() != xid) { + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume")); + + } + if (!dtxBuffer->isSuspended()) { + throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended")); + } + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); + txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); +} + +void SemanticState::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + +void SemanticState::record(const DeliveryRecord& delivery) +{ + unacked.push_back(delivery); +} + +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); + +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _acquire, + bool _exclusive, + const string& _resumeId, + uint64_t _resumeTtl, + const framing::FieldTable& _arguments + + +) : + Consumer(_acquire), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), + acquire(_acquire), + blocked(true), + windowing(true), + exclusive(_exclusive), + resumeId(_resumeId), + resumeTtl(_resumeTtl), + arguments(_arguments), + msgCredit(0), + byteCredit(0), + notifyEnabled(true), + syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), + deliveryCount(0), + mgmtObject(0) +{ + if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) + { + ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); + qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); + + if (agent != 0) + { + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); + agent->addObject (mgmtObject); + mgmtObject->set_creditMode("WINDOW"); + } + } +} + +ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + return status; +} + + +OwnershipToken* SemanticState::ConsumerImpl::getSession() +{ + return &(parent->session); +} + +bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) +{ + assertClusterSafe(); + allocateCredit(msg.payload); + DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + bool sync = syncFrequency && ++deliveryCount >= syncFrequency; + if (sync) deliveryCount = 0;//reset + parent->deliver(record, sync); + if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered + if (windowing || ackExpected || !acquire) { + parent->record(record); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg); + } + if (mgmtObject) { mgmtObject->inc_delivered(); } + return true; +} + +bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) +{ + return true; +} + +bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) +{ + assertClusterSafe(); + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // + blocked = !(filter(msg) && checkCredit(msg)); + return !blocked; +} + +namespace { +struct ConsumerName { + const SemanticState::ConsumerImpl& consumer; + ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} +}; + +ostream& operator<<(ostream& o, const ConsumerName& pc) { + return o << pc.consumer.getName() << " on " + << pc.consumer.getParent().getSession().getSessionId(); +} +} + +void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) +{ + assertClusterSafe(); + uint32_t originalMsgCredit = msgCredit; + uint32_t originalByteCredit = byteCredit; + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); + } + QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) + << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit + << " now bytes: " << byteCredit << " msgs: " << msgCredit); + +} + +bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) +{ + bool enoughCredit = msgCredit > 0 && + (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); + QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") + << ConsumerName(*this) + << ", have bytes: " << byteCredit << " msgs: " << msgCredit + << ", need " << msg->getRequiredCredit() << " bytes"); + return enoughCredit; +} + +SemanticState::ConsumerImpl::~ConsumerImpl() +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} + +void SemanticState::disable(ConsumerImpl::shared_ptr c) +{ + c->disableNotify(); + if (session.isAttached()) + session.getConnection().outputTasks.removeOutputTask(c.get()); +} + +void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) +{ + Queue::shared_ptr queue = c->getQueue(); + if(queue) { + queue->cancel(c); + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + Queue::tryAutoDelete(session.getBroker(), queue); + } + } +} + +void SemanticState::cancel(ConsumerImpl::shared_ptr c) +{ + disable(c); + unsubscribe(c); +} + +void SemanticState::handle(intrusive_ptr<Message> msg) { + if (txBuffer.get()) { + TxPublish* deliverable(new TxPublish(msg)); + TxOp::shared_ptr op(deliverable); + route(msg, *deliverable); + txBuffer->enlist(op); + } else { + DeliverableMessage deliverable(msg); + route(msg, deliverable); + if (msg->isContentReleaseRequested()) { + // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the + // presence of these messages). Do not change these without also checking these tests. + if (msg->isContentReleaseBlocked()) { + QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << + std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked"); + } else { + msg->releaseContent(); + QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << + std::hex << msg->getPersistenceId() << std::dec << ": Content released"); + } + } + } +} + +namespace +{ +const std::string nullstring; +} + +void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) + cacheExchange = session.getBroker().getExchanges().get(exchangeName); + cacheExchange->setProperties(msg); + + /* verify the userid if specified: */ + std::string id = + msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; + + if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) + { + QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); + throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); + } + + if (acl && acl->doTransferAcl()) + { + if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() )) + throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " << + exchangeName << " with routing-key " << msg->getRoutingKey())); + } + + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + + if (!strategy.delivered) { + //TODO:if discard-unroutable, just drop it + //TODO:else if accept-mode is explicit, reject it + //else route it to alternate exchange + if (cacheExchange->getAlternate()) { + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + } + if (!strategy.delivered) { + msg->destroy(); + } + } + +} + +void SemanticState::requestDispatch() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) + i->second->requestDispatch(); +} + +void SemanticState::ConsumerImpl::requestDispatch() +{ + assertClusterSafe(); + if (blocked) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + blocked = false; + } +} + +bool SemanticState::complete(DeliveryRecord& delivery) +{ + ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); + if (i != consumers.end()) { + i->second->complete(delivery); + } + return delivery.isRedundant(); +} + +void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) +{ + if (!delivery.isComplete()) { + delivery.complete(); + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + } + } +} + +void SemanticState::recover(bool requeue) +{ + if(requeue){ + //take copy and clear unacked as requeue may result in redelivery to this session + //which will in turn result in additions to unacked + DeliveryRecords copy = unacked; + unacked.clear(); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + //unconfirmed messages re redelivered and therefore have their + //id adjusted, confirmed messages are not and so the ordering + //w.r.t id is lost + sort(unacked.begin(), unacked.end()); + } +} + +void SemanticState::deliver(DeliveryRecord& msg, bool sync) +{ + return deliveryAdapter.deliver(msg, sync); +} + +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +{ + ConsumerImplMap::iterator i = consumers.find(destination); + if (i == consumers.end()) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + } else { + return *(i->second); + } +} + +void SemanticState::setWindowMode(const std::string& destination) +{ + find(destination).setWindowMode(); +} + +void SemanticState::setCreditMode(const std::string& destination) +{ + find(destination).setCreditMode(); +} + +void SemanticState::addByteCredit(const std::string& destination, uint32_t value) +{ + ConsumerImpl& c = find(destination); + c.addByteCredit(value); + c.requestDispatch(); +} + + +void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) +{ + ConsumerImpl& c = find(destination); + c.addMessageCredit(value); + c.requestDispatch(); +} + +void SemanticState::flush(const std::string& destination) +{ + find(destination).flush(); +} + + +void SemanticState::stop(const std::string& destination) +{ + find(destination).stop(); +} + +void SemanticState::ConsumerImpl::setWindowMode() +{ + assertClusterSafe(); + windowing = true; + if (mgmtObject){ + mgmtObject->set_creditMode("WINDOW"); + } +} + +void SemanticState::ConsumerImpl::setCreditMode() +{ + assertClusterSafe(); + windowing = false; + if (mgmtObject){ + mgmtObject->set_creditMode("CREDIT"); + } +} + +void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +{ + assertClusterSafe(); + if (byteCredit != 0xFFFFFFFF) { + if (value == 0xFFFFFFFF) byteCredit = value; + else byteCredit += value; + } +} + +void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +{ + assertClusterSafe(); + if (msgCredit != 0xFFFFFFFF) { + if (value == 0xFFFFFFFF) msgCredit = value; + else msgCredit += value; + } +} + +bool SemanticState::ConsumerImpl::haveCredit() +{ + if (msgCredit && byteCredit) { + return true; + } else { + blocked = true; + return false; + } +} + +void SemanticState::ConsumerImpl::flush() +{ + while(haveCredit() && queue->dispatch(shared_from_this())) + ; + stop(); +} + +void SemanticState::ConsumerImpl::stop() +{ + assertClusterSafe(); + msgCredit = 0; + byteCredit = 0; +} + +Queue::shared_ptr SemanticState::getQueue(const string& name) const { + Queue::shared_ptr queue; + if (name.empty()) { + throw NotAllowedException(QPID_MSG("No queue name specified.")); + } else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return queue; +} + +AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) +{ + return DeliveryRecord::findRange(unacked, first, last); +} + +void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, AcquireFunctor(acquired)); +} + +void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered) +{ + AckRange range = findRange(first, last); + //release results in the message being added to the head so want + //to release in reverse order to keep the original transfer order + DeliveryRecords::reverse_iterator start(range.end); + DeliveryRecords::reverse_iterator end(range.start); + for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered)); +} + +void SemanticState::reject(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); + //may need to remove the delivery records as well + for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) { + if (i->isRedundant()) i = unacked.erase(i); + else i++; + } +} + +bool SemanticState::ConsumerImpl::doOutput() +{ + try { + return haveCredit() && queue->dispatch(shared_from_this()); + } catch (const SessionException& e) { + throw SessionOutputException(e, parent->session.getChannel()); + } +} + +void SemanticState::ConsumerImpl::enableNotify() +{ + Mutex::ScopedLock l(lock); + assertClusterSafe(); + notifyEnabled = true; +} + +void SemanticState::ConsumerImpl::disableNotify() +{ + Mutex::ScopedLock l(lock); + notifyEnabled = false; +} + +bool SemanticState::ConsumerImpl::isNotifyEnabled() const { + Mutex::ScopedLock l(lock); + return notifyEnabled; +} + +void SemanticState::ConsumerImpl::notify() +{ + Mutex::ScopedLock l(lock); + assertClusterSafe(); + if (notifyEnabled) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + } +} + + +// Test that a DeliveryRecord's ID is in a sequence set and some other +// predicate on DeliveryRecord holds. +template <class Predicate> struct IsInSequenceSetAnd { + IsInSequenceSet isInSet; + Predicate predicate; + IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {} + bool operator()(DeliveryRecord& dr) { + return isInSet(dr.getId()) && predicate(dr); + } +}; + +template<class Predicate> IsInSequenceSetAnd<Predicate> +isInSequenceSetAnd(const SequenceSet& s, Predicate p) { + return IsInSequenceSetAnd<Predicate>(s,p); +} + +void SemanticState::accepted(const SequenceSet& commands) { + assertClusterSafe(); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.add(commands); + + if (dtxBuffer.get()) { + //if enlisted in a dtx, copy the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + + //mark the relevant messages as 'ended' in unacked + //if the messages are already completed, they can be + //removed from the record + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::setEnded, _1))); + unacked.erase(removed, unacked.end()); + } + } else { + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::accept, _1, + (TransactionContext*) 0))); + unacked.erase(removed, unacked.end()); + } +} + +void SemanticState::completed(const SequenceSet& commands) { + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&SemanticState::complete, this, _1))); + unacked.erase(removed, unacked.end()); + requestDispatch(); +} + +void SemanticState::attached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->enableNotify(); + session.getConnection().outputTasks.addOutputTask(i->second.get()); + } + session.getConnection().outputTasks.activateOutput(); +} + +void SemanticState::detached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->disableNotify(); + session.getConnection().outputTasks.removeOutputTask(i->second.get()); + } +} + +}} // namespace qpid::broker |