summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp77
1 files changed, 42 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index b7985e9ed8..3ad29e6271 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
getBroker().getExchanges().destroy(name);
}
-Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
+ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
{
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- return Exchange010QueryResult("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
@@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
}
-Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
+ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
const framing::FieldTable& args)
@@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
}
if (!exchange) {
- return Exchange010BoundResult(true, false, false, false, false);
+ return ExchangeBoundResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- return Exchange010BoundResult(false, true, false, false, false);
+ return ExchangeBoundResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return Exchange010BoundResult(false, false, false, false, false);
+ return ExchangeBoundResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
@@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
+ destroyExclusiveQueues();
+}
+
+void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
+{
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
@@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
}
-Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
+QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
- return Queue010QueryResult(queue->getName(),
+ return QueueQueryResult(queue->getName(),
alternateExchange ? alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
@@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
uint8_t /*acquireMode*/)
{
//not yet used (content containing assemblies treated differently at present
+ std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl;
}
void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered)
@@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
commands.for_each(acceptOp);
}
-framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
+framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
//TODO: change this when SequenceNumberSet is deleted along with preview code
SequenceNumberSet results;
@@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con
RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
results.processRanges(g);
- return Message010AcquireResult(acquisitions);
+ return MessageAcquireResult(acquisitions);
}
@@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
-std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
{
std::string encoded;
encode(xid, encoded);
@@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select()
state.selectDtx();
}
-Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
@@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return Dtx010EndResult(XA_RBROLLBACK);
+ return DtxEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
} else {
state.endDtx(convert(xid), false);
}
- return Dtx010EndResult(XA_OK);
+ return DtxEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return Dtx010EndResult(XA_RBTIMEOUT);
+ return DtxEndResult(XA_RBTIMEOUT);
}
}
-Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
@@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return Dtx010StartResult(XA_OK);
+ return DtxStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010StartResult(XA_RBTIMEOUT);
+ return DtxStartResult(XA_RBTIMEOUT);
}
}
-Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010PrepareResult(XA_RBTIMEOUT);
+ return DtxPrepareResult(XA_RBTIMEOUT);
}
}
-Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010CommitResult(XA_RBTIMEOUT);
+ return DtxCommitResult(XA_RBTIMEOUT);
}
}
-Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return Dtx010RollbackResult(XA_OK);
+ return DtxRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010RollbackResult(XA_RBTIMEOUT);
+ return DtxRollbackResult(XA_RBTIMEOUT);
}
}
-Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
getBroker().getStore().collectPreparedXids(xids);
@@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
boost::shared_ptr<FieldValue> xid(new Struct32Value(*i));
indoubt.add(xid);
}
- return Dtx010RecoverResult(indoubt);
+ return DtxRecoverResult(indoubt);
}
-void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
-Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return Dtx010GetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}
-void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
u_int32_t timeout)
{
getBroker().getDtxManager().setTimeout(convert(xid), timeout);