summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Session.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
committerAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
commit2f6d6ad7efd788b71204af67dff51b6233881e2e (patch)
treea3d123bc112d12dfcef341a312f418624c98e342 /cpp/src/qpid/broker/Session.cpp
parent3b80f903b6174b4346d7d7b537d783f628fe28d6 (diff)
downloadqpid-python-2f6d6ad7efd788b71204af67dff51b6233881e2e.tar.gz
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains. broker::SemanticState: session state for the SemanticHandler. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Session.cpp')
-rw-r--r--cpp/src/qpid/broker/Session.cpp608
1 files changed, 0 insertions, 608 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
deleted file mode 100644
index d379b40d3f..0000000000
--- a/cpp/src/qpid/broker/Session.cpp
+++ /dev/null
@@ -1,608 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Session.h"
-
-#include "BrokerAdapter.h"
-#include "BrokerQueue.h"
-#include "Connection.h"
-#include "DeliverableMessage.h"
-#include "DtxAck.h"
-#include "DtxTimeout.h"
-#include "Message.h"
-#include "SemanticHandler.h"
-#include "SessionHandler.h"
-#include "TxAck.h"
-#include "TxPublish.h"
-#include "qpid/QpidError.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-#include <iostream>
-#include <sstream>
-#include <algorithm>
-#include <functional>
-
-#include <assert.h>
-
-
-namespace qpid {
-namespace broker {
-
-using std::mem_fun_ref;
-using std::bind2nd;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-Session::Session(SessionHandler& a, uint32_t t)
- : adapter(&a),
- broker(adapter->getConnection().broker),
- timeout(t),
- id(true),
- prefetchSize(0),
- prefetchCount(0),
- tagGenerator("sgen"),
- dtxSelected(false),
- accumulatedAck(0),
- flowActive(true)
-{
- outstanding.reset();
- std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
- // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState.
- deliveryAdapter=semantic.get();
- handlers.push_back(semantic.release());
- in = &handlers[0];
- out = &adapter->out;
- // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
- // create a SessionManager in the broker for all session related
- // stuff: suspended sessions, handler updaters etc.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
- broker.update(a.getChannel(), *this);
-}
-
-Session::~Session() {
- close();
-}
-
-bool Session::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
- bool exclusive, const FieldTable*)
-{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(c.get(), exclusive);//may throw exception
- consumers.insert(tagInOut, c.release());
-}
-
-void Session::cancel(const string& tag){
- // consumers is a ptr_map so erase will delete the consumer
- // which will call cancel.
- ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- consumers.erase(i);
-}
-
-void Session::close()
-{
- opened = false;
- consumers.clear();
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
- }
- recover(true);
-}
-
-void Session::startTx()
-{
- txBuffer = TxBuffer::shared_ptr(new TxBuffer());
-}
-
-void Session::commit(MessageStore* const store)
-{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
-
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
- txBuffer->enlist(txAck);
- if (txBuffer->commitLocal(store)) {
- accumulatedAck.clear();
- }
-}
-
-void Session::rollback()
-{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
-
- txBuffer->rollback();
- accumulatedAck.clear();
-}
-
-void Session::selectDtx()
-{
- dtxSelected = true;
-}
-
-void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
-{
- if (!dtxSelected) {
- throw ConnectionException(503, "Session has not been selected for use with dtx");
- }
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
- if (join) {
- mgr.join(xid, dtxBuffer);
- } else {
- mgr.start(xid, dtxBuffer);
- }
-}
-
-void Session::endDtx(const std::string& xid, bool fail)
-{
- if (!dtxBuffer) {
- throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
- }
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
- % dtxBuffer->getXid() % xid);
- }
-
- txBuffer.reset();//ops on this session no longer transactional
-
- checkDtxTimeout();
- if (fail) {
- dtxBuffer->fail();
- } else {
- dtxBuffer->markEnded();
- }
- dtxBuffer.reset();
-}
-
-void Session::suspendDtx(const std::string& xid)
-{
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
- % dtxBuffer->getXid() % xid);
- }
- txBuffer.reset();//ops on this session no longer transactional
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(true);
-}
-
-void Session::resumeDtx(const std::string& xid)
-{
- if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
- % dtxBuffer->getXid() % xid);
- }
- if (!dtxBuffer->isSuspended()) {
- throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
- }
-
- checkDtxTimeout();
- dtxBuffer->setSuspended(false);
- txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
-}
-
-void Session::checkDtxTimeout()
-{
- if (dtxBuffer->isExpired()) {
- dtxBuffer.reset();
- throw DtxTimeoutException();
- }
-}
-
-void Session::record(const DeliveryRecord& delivery)
-{
- unacked.push_back(delivery);
- delivery.addTo(outstanding);
-}
-
-bool Session::checkPrefetch(Message::shared_ptr& msg)
-{
- Mutex::ScopedLock locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacked.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
- return countOk && sizeOk;
-}
-
-Session::ConsumerImpl::ConsumerImpl(Session* _parent,
- DeliveryToken::shared_ptr _token,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _nolocal,
- bool _acquire
- ) :
- Consumer(_acquire),
- parent(_parent),
- token(_token),
- name(_name),
- queue(_queue),
- ackExpected(ack),
- nolocal(_nolocal),
- acquire(_acquire),
- blocked(false),
- windowing(true),
- msgCredit(0),
- byteCredit(0) {}
-
-bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
-{
- if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
- return false;
- } else {
- if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
- blocked = true;
- } else {
- blocked = false;
-
- Mutex::ScopedLock locker(parent->deliveryLock);
-
- DeliveryId deliveryTag =
- parent->deliveryAdapter->deliver(msg.payload, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
- }
- }
- return !blocked;
- }
-}
-
-bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
-{
- Mutex::ScopedLock l(lock);
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- return false;
- } else {
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
- return true;
- }
-}
-
-void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
- Mutex::ScopedLock locker(parent->deliveryLock);
- parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
-}
-
-Session::ConsumerImpl::~ConsumerImpl() {
- cancel();
-}
-
-void Session::ConsumerImpl::cancel()
-{
- if(queue) {
- queue->cancel(this);
- if (queue->canAutoDelete()) {
- parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
- }
- }
-}
-
-void Session::ConsumerImpl::requestDispatch()
-{
- if(blocked)
- queue->requestDispatch(this);
-}
-
-void Session::handle(Message::shared_ptr msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- }
-}
-
-void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
- std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
- }
-
- cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
-
- if (!strategy.delivered) {
- //TODO:if reject-unroutable, then reject
- //else route to alternate exchange
- if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
- }
- }
-
-}
-
-void Session::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void Session::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(start, end);
- }
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
-}
-
-void Session::acknowledged(const DeliveryRecord& delivery)
-{
- delivery.subtractFrom(outstanding);
- ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
- if (i != consumers.end()) {
- i->acknowledged(delivery);
- }
-}
-
-void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
-{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
- }
-}
-
-void Session::recover(bool requeue)
-{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstanding.reset();
- //take copy and clear unacked as requeue may result in redelivery to this session
- //which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
- }
-}
-
-bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
-{
- QueuedMessage msg = queue->dequeue();
- if(msg.payload){
- Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
- DeliveryId deliveryTag)
-{
- ConsumerImplMap::iterator i = consumers.find(consumerTag);
- if (i != consumers.end()){
- i->redeliver(msg, deliveryTag);
- }
-}
-
-void Session::flow(bool active)
-{
- Mutex::ScopedLock locker(deliveryLock);
- bool requestDelivery(!flowActive && active);
- flowActive = active;
- if (requestDelivery) {
- //there may be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
- }
-}
-
-
-Session::ConsumerImpl& Session::find(const std::string& destination)
-{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
- } else {
- return *i;
- }
-}
-
-void Session::setWindowMode(const std::string& destination)
-{
- find(destination).setWindowMode();
-}
-
-void Session::setCreditMode(const std::string& destination)
-{
- find(destination).setCreditMode();
-}
-
-void Session::addByteCredit(const std::string& destination, uint32_t value)
-{
- find(destination).addByteCredit(value);
-}
-
-
-void Session::addMessageCredit(const std::string& destination, uint32_t value)
-{
- find(destination).addMessageCredit(value);
-}
-
-void Session::flush(const std::string& destination)
-{
- ConsumerImpl& c = find(destination);
- c.flush();
-}
-
-
-void Session::stop(const std::string& destination)
-{
- find(destination).stop();
-}
-
-void Session::ConsumerImpl::setWindowMode()
-{
- windowing = true;
-}
-
-void Session::ConsumerImpl::setCreditMode()
-{
- windowing = false;
-}
-
-void Session::ConsumerImpl::addByteCredit(uint32_t value)
-{
- byteCredit += value;
- requestDispatch();
-}
-
-void Session::ConsumerImpl::addMessageCredit(uint32_t value)
-{
- msgCredit += value;
- requestDispatch();
-}
-
-void Session::ConsumerImpl::flush()
-{
- //need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero; TODO: come up with better
- //implementation of flush.
- Mutex::ScopedLock l(lock);
- queue->requestDispatch(this, true);
- byteCredit = 0;
- msgCredit = 0;
-}
-
-void Session::ConsumerImpl::stop()
-{
- msgCredit = 0;
- byteCredit = 0;
-}
-
-Queue::shared_ptr Session::getQueue(const string& name) const {
- //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getDefaultQueue();
- if (!queue)
- throw NotAllowedException(QPID_MSG("No queue name specified."));
- }
- else {
- queue = getBroker().getQueues().find(name);
- if (!queue)
- throw NotFoundException(QPID_MSG("Queue not found: "<<name));
- }
- return queue;
-}
-
-AckRange Session::findRange(DeliveryId first, DeliveryId last)
-{
- ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (start != unacked.end()) {
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- }
- }
- return AckRange(start, end);
-}
-
-void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
-{
- Mutex::ScopedLock locker(deliveryLock);
- AckRange range = findRange(first, last);
- for_each(range.start, range.end, AcquireFunctor(acquired));
-}
-
-void Session::release(DeliveryId first, DeliveryId last)
-{
- Mutex::ScopedLock locker(deliveryLock);
- AckRange range = findRange(first, last);
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
-}
-
-void Session::reject(DeliveryId first, DeliveryId last)
-{
- Mutex::ScopedLock locker(deliveryLock);
- AckRange range = findRange(first, last);
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
- //need to remove the delivery records as well
- unacked.erase(range.start, range.end);
-}
-
-}} // namespace qpid::broker