diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 77 |
1 files changed, 42 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index b7985e9ed8..3ad29e6271 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU getBroker().getExchanges().destroy(name); } -Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) +ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) { try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - return Exchange010QueryResult("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, @@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, } -Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, +ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, const std::string& queueName, const std::string& key, const framing::FieldTable& args) @@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str } if (!exchange) { - return Exchange010BoundResult(true, false, false, false, false); + return ExchangeBoundResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - return Exchange010BoundResult(false, true, false, false, false); + return ExchangeBoundResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return Exchange010BoundResult(false, false, false, false, false); + return ExchangeBoundResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); + return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } @@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() { + destroyExclusiveQueues(); +} + +void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() +{ while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() exclusiveQueues.erase(exclusiveQueues.begin()); } } + bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const { @@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const } -Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) +QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - return Queue010QueryResult(queue->getName(), + return QueueQueryResult(queue->getName(), alternateExchange ? alternateExchange->getName() : "", queue->isDurable(), queue->hasExclusiveOwner(), @@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, uint8_t /*acquireMode*/) { //not yet used (content containing assemblies treated differently at present + std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl; } void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered) @@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm commands.for_each(acceptOp); } -framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) +framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { //TODO: change this when SequenceNumberSet is deleted along with preview code SequenceNumberSet results; @@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2); results.processRanges(g); - return Message010AcquireResult(acquisitions); + return MessageAcquireResult(acquisitions); } @@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } -std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid) { std::string encoded; encode(xid, encoded); @@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select() state.selectDtx(); } -Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, +DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, bool fail, bool suspend) { @@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return Dtx010EndResult(XA_RBROLLBACK); + return DtxEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, } else { state.endDtx(convert(xid), false); } - return Dtx010EndResult(XA_OK); + return DtxEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return Dtx010EndResult(XA_RBTIMEOUT); + return DtxEndResult(XA_RBTIMEOUT); } } -Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, +DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, bool join, bool resume) { @@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, } else { state.startDtx(convert(xid), getBroker().getDtxManager(), join); } - return Dtx010StartResult(XA_OK); + return DtxStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010StartResult(XA_RBTIMEOUT); + return DtxStartResult(XA_RBTIMEOUT); } } -Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010PrepareResult(XA_RBTIMEOUT); + return DtxPrepareResult(XA_RBTIMEOUT); } } -Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, +DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010CommitResult(XA_RBTIMEOUT); + return DtxCommitResult(XA_RBTIMEOUT); } } -Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { getBroker().getDtxManager().rollback(convert(xid)); - return Dtx010RollbackResult(XA_OK); + return DtxRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010RollbackResult(XA_RBTIMEOUT); + return DtxRollbackResult(XA_RBTIMEOUT); } } -Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; getBroker().getStore().collectPreparedXids(xids); @@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() boost::shared_ptr<FieldValue> xid(new Struct32Value(*i)); indoubt.add(xid); } - return Dtx010RecoverResult(indoubt); + return DtxRecoverResult(indoubt); } -void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) { //Currently no heuristic completion is supported, so this should never be used. throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } -Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return Dtx010GetTimeoutResult(timeout); + return DtxGetTimeoutResult(timeout); } -void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, u_int32_t timeout) { getBroker().getDtxManager().setTimeout(convert(xid), timeout); |