summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Session.cpp')
-rw-r--r--cpp/src/qpid/broker/Session.cpp530
1 files changed, 526 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index 2940c8cccb..26b694d073 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,22 +20,543 @@
*/
#include "Session.h"
+
+#include "BrokerAdapter.h"
+#include "BrokerQueue.h"
+#include "Connection.h"
+#include "DeliverableMessage.h"
+#include "DtxAck.h"
+#include "DtxTimeout.h"
+#include "Message.h"
#include "SemanticHandler.h"
#include "SessionAdapter.h"
+#include "TxAck.h"
+#include "TxPublish.h"
+#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+
+#include <assert.h>
+
namespace qpid {
namespace broker {
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
Session::Session(SessionAdapter& a, uint32_t t)
- : adapter(&a), timeout(t)
+ : adapter(&a),
+ broker(adapter->getConnection().broker),
+ timeout(t),
+ prefetchSize(0),
+ prefetchCount(0),
+ tagGenerator("sgen"),
+ dtxSelected(false),
+ accumulatedAck(0),
+ flowActive(true)
{
- assert(adapter);
+ outstanding.reset();
// FIXME aconway 2007-08-29: handler to get Session, not connection.
- handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection()));
+ std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
+ deliveryAdapter=semantic.get();
+ // FIXME aconway 2007-08-31: Remove, workaround.
+ semanticHandler=semantic.get();
+ handlers.push_back(semantic.release());
in = &handlers[0];
- out = &adapter->getConnection().getOutput();
+ out = &adapter->out;
+ // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
+ // create a SessionManager in the broker for all session related
+ // stuff: suspended sessions, handler updaters etc.
+ // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
+ broker.update(a.getChannel(), *this);
+}
+
+Session::~Session() {
+ close();
+}
+
+bool Session::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
+ Queue::shared_ptr queue, bool nolocal, bool acks,
+ bool exclusive, const FieldTable*)
+{
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
+}
+
+void Session::cancel(const string& tag){
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
+}
+
+void Session::close()
+{
+ opened = false;
+ consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
+}
+
+void Session::startTx()
+{
+ txBuffer = TxBuffer::shared_ptr(new TxBuffer());
+}
+
+void Session::commit(MessageStore* const store)
+{
+ if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ txBuffer->enlist(txAck);
+ if (txBuffer->commitLocal(store)) {
+ accumulatedAck.clear();
+ }
+}
+
+void Session::rollback()
+{
+ if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+ txBuffer->rollback();
+ accumulatedAck.clear();
+}
+
+void Session::selectDtx()
+{
+ dtxSelected = true;
+}
+
+void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
+ if (!dtxSelected) {
+ throw ConnectionException(503, "Session has not been selected for use with dtx");
+ }
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+ if (join) {
+ mgr.join(xid, dtxBuffer);
+ } else {
+ mgr.start(xid, dtxBuffer);
+ }
+}
+
+void Session::endDtx(const std::string& xid, bool fail)
+{
+ if (!dtxBuffer) {
+ throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+ }
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
+ % dtxBuffer->getXid() % xid);
+ }
+
+ txBuffer.reset();//ops on this session no longer transactional
+
+ checkDtxTimeout();
+ if (fail) {
+ dtxBuffer->fail();
+ } else {
+ dtxBuffer->markEnded();
+ }
+ dtxBuffer.reset();
+}
+
+void Session::suspendDtx(const std::string& xid)
+{
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
+ % dtxBuffer->getXid() % xid);
+ }
+ txBuffer.reset();//ops on this session no longer transactional
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(true);
+}
+
+void Session::resumeDtx(const std::string& xid)
+{
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
+ % dtxBuffer->getXid() % xid);
+ }
+ if (!dtxBuffer->isSuspended()) {
+ throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ }
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(false);
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
+void Session::checkDtxTimeout()
+{
+ if (dtxBuffer->isExpired()) {
+ dtxBuffer.reset();
+ throw DtxTimeoutException();
+ }
+}
+
+void Session::record(const DeliveryRecord& delivery)
+{
+ unacked.push_back(delivery);
+ delivery.addTo(outstanding);
+}
+
+bool Session::checkPrefetch(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Session::ConsumerImpl::ConsumerImpl(Session* _parent,
+ DeliveryToken::shared_ptr _token,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _nolocal,
+ bool _acquire
+) : parent(_parent),
+ token(_token),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
+ nolocal(_nolocal),
+ acquire(_acquire),
+ blocked(false),
+ windowing(true),
+ msgCredit(0xFFFFFFFF),
+ byteCredit(0xFFFFFFFF) {}
+
+bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+{
+ if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) {
+ return false;
+ } else {
+ if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
+ blocked = true;
+ } else {
+ blocked = false;
+
+ Mutex::ScopedLock locker(parent->deliveryLock);
+
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter->deliver(msg.payload, token);
+ if (ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+ }
+ }
+ return !blocked;
+ }
+}
+
+bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock l(lock);
+ if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+ return false;
+ } else {
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
+ }
+ return true;
+ }
+}
+
+void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+ Mutex::ScopedLock locker(parent->deliveryLock);
+ parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+}
+
+Session::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
+void Session::ConsumerImpl::cancel()
+{
+ if(queue) {
+ queue->cancel(this);
+ if (queue->canAutoDelete()) {
+ parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ }
+ }
+}
+
+void Session::ConsumerImpl::requestDispatch()
+{
+ if(blocked)
+ queue->requestDispatch();
+}
+
+void Session::handle(Message::shared_ptr msg) {
+ if (txBuffer.get()) {
+ TxPublish* deliverable(new TxPublish(msg));
+ TxOp::shared_ptr op(deliverable);
+ route(msg, *deliverable);
+ txBuffer->enlist(op);
+ } else {
+ DeliverableMessage deliverable(msg);
+ route(msg, deliverable);
+ }
+}
+void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+ std::string exchangeName = msg->getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
+ }
+
+ cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+
+ if (!strategy.delivered) {
+ //TODO:if reject-unroutable, then reject
+ //else route to alternate exchange
+ if (cacheExchange->getAlternate()) {
+ cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ }
+
+}
+
+void Session::ackCumulative(DeliveryId id)
+{
+ ack(id, id, true);
+}
+
+void Session::ackRange(DeliveryId first, DeliveryId last)
+{
+ ack(first, last, false);
+}
+
+void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
+ }
+
+ for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
+
+ //if the prefetch limit had previously been reached, or credit
+ //had expired in windowing mode there may be messages that can
+ //be now be delivered
+ for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Session::acknowledged(const DeliveryRecord& delivery)
+{
+ delivery.subtractFrom(outstanding);
+ ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ if (i != consumers.end()) {
+ i->acknowledged(delivery);
+ }
+}
+
+void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ }
+}
+
+void Session::recover(bool requeue)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ //take copy and clear unacked as requeue may result in redelivery to this session
+ //which will in turn result in additions to unacked
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
+ QueuedMessage msg = queue->dequeue();
+ if(msg.payload){
+ Mutex::ScopedLock locker(deliveryLock);
+ DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ DeliveryId deliveryTag)
+{
+ ConsumerImplMap::iterator i = consumers.find(consumerTag);
+ if (i != consumers.end()){
+ i->redeliver(msg, deliveryTag);
+ }
+}
+
+void Session::flow(bool active)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ }
+}
+
+
+Session::ConsumerImpl& Session::find(const std::string& destination)
+{
+ ConsumerImplMap::iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ } else {
+ return *i;
+ }
+}
+
+void Session::setWindowMode(const std::string& destination)
+{
+ find(destination).setWindowMode();
+}
+
+void Session::setCreditMode(const std::string& destination)
+{
+ find(destination).setCreditMode();
+}
+
+void Session::addByteCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addByteCredit(value);
+}
+
+
+void Session::addMessageCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addMessageCredit(value);
+}
+
+void Session::flush(const std::string& destination)
+{
+ ConsumerImpl& c = find(destination);
+ c.flush();
+}
+
+
+void Session::stop(const std::string& destination)
+{
+ find(destination).stop();
+}
+
+void Session::ConsumerImpl::setWindowMode()
+{
+ windowing = true;
+}
+
+void Session::ConsumerImpl::setCreditMode()
+{
+ windowing = false;
+}
+
+void Session::ConsumerImpl::addByteCredit(uint32_t value)
+{
+ byteCredit += value;
+ requestDispatch();
+}
+
+void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+ msgCredit += value;
+ requestDispatch();
+}
+
+void Session::ConsumerImpl::flush()
+{
+ //TODO: need to wait until any messages that are available for
+ //this consumer have been delivered... i.e. some sort of flush on
+ //the queue...
+}
+
+void Session::ConsumerImpl::stop()
+{
+ msgCredit = 0;
+ byteCredit = 0;
+}
+
+Queue::shared_ptr Session::getQueue(const string& name) const {
+ //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getDefaultQueue();
+ if (!queue)
+ throw NotAllowedException(QPID_MSG("No queue name specified."));
+ }
+ else {
+ queue = getBroker().getQueues().find(name);
+ if (!queue)
+ throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
+ return queue;
+}
}} // namespace qpid::broker