summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-18 22:08:38 +0000
committerAlan Conway <aconway@apache.org>2012-01-18 22:08:38 +0000
commit77a6ae21b11dcdd2272a0110816b7e69ff8b6aab (patch)
treeeeca9a0aecfe3cd8b30930d35b958cd79881003f
parent95212f86c498cb5612b3cc0970b13e9038f10700 (diff)
downloadqpid-python-77a6ae21b11dcdd2272a0110816b7e69ff8b6aab.tar.gz
QPID-3603: Fix spurious "exchange not found" error and debug messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1233085 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/QueueBindings.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp95
4 files changed, 58 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index d68845062d..5d763bf0da 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -279,12 +279,10 @@ uint32_t Exchange::encodedSize() const
void Exchange::recoveryComplete(ExchangeRegistry& exchanges)
{
if (!alternateName.empty()) {
- try {
- Exchange::shared_ptr ae = exchanges.get(alternateName);
- setAlternate(ae);
- } catch (const NotFoundException&) {
- QPID_LOG(warning, "Could not set alternate exchange \"" << alternateName << "\": does not exist.");
- }
+ Exchange::shared_ptr ae = exchanges.find(alternateName);
+ if (ae) setAlternate(ae);
+ else QPID_LOG(warning, "Could not set alternate exchange \""
+ << alternateName << "\": does not exist.");
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f87041390b..6334524612 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1348,12 +1348,11 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
if (!alternateExchangeName.empty()) {
- try {
- Exchange::shared_ptr ae = exchanges.get(alternateExchangeName);
- setAlternateExchange(ae);
- } catch (const NotFoundException&) {
- QPID_LOG(warning, "Could not set alternate exchange \"" << alternateExchangeName << "\" on queue \"" << name << "\": exchange does not exist.");
- }
+ Exchange::shared_ptr ae = exchanges.find(alternateExchangeName);
+ if (ae) setAlternateExchange(ae);
+ else QPID_LOG(warning, "Could not set alternate exchange \""
+ << alternateExchangeName << "\" on queue \"" << name
+ << "\": exchange does not exist.");
}
//process any pending dequeues
for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
diff --git a/qpid/cpp/src/qpid/broker/QueueBindings.cpp b/qpid/cpp/src/qpid/broker/QueueBindings.cpp
index c6b3ddf9a9..1cc3486d9a 100644
--- a/qpid/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueBindings.cpp
@@ -42,10 +42,10 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
local = bindings;
}
- for (Bindings::iterator i = local.begin(); i != local.end(); i++)
- try {
- exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
- } catch (const NotFoundException&) {}
+ for (Bindings::iterator i = local.begin(); i != local.end(); i++) {
+ Exchange::shared_ptr ex = exchanges.find(i->exchange);
+ if (ex) ex->unbind(queue, i->key, &(i->args));
+ }
}
QueueBinding::QueueBinding(const string& _exchange, const string& _key, const FieldTable& _args)
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 63c4b660b2..c50bf10f7b 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -61,8 +61,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
static const std::string _TRUE("true");
static const std::string _FALSE("false");
-void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
- const string& alternateExchange,
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+ const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
//TODO: implement autoDelete
@@ -132,10 +132,10 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex
|| !exchange->getAlternate()))
throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
<< (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
- << ", requested "
+ << ", requested "
<< alternate->getName()));
}
-
+
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
//TODO: implement if-unused
@@ -149,23 +149,21 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId()));
}
-
- try {
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ Exchange::shared_ptr exchange(getBroker().getExchanges().find(name));
+ if (exchange)
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const NotFoundException& /*e*/) {
- return ExchangeQueryResult("", false, true, FieldTable());
- }
+ else
+ return ExchangeQueryResult("", false, true, FieldTable());
}
-void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments)
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
getConnection().getUserId(), getConnection().getUrl());
}
-
+
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
@@ -187,11 +185,8 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,&params) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
-
- Exchange::shared_ptr exchange;
- try {
- exchange = getBroker().getExchanges().get(exchangeName);
- } catch (const NotFoundException&) {}
+
+ Exchange::shared_ptr exchange = getBroker().getExchanges().find(exchangeName);
Queue::shared_ptr queue;
if (!queueName.empty()) {
@@ -238,10 +233,10 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
-
-bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
-{
- return session.isLocal(t);
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
}
@@ -252,15 +247,15 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
}
-
+
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-
- return QueueQueryResult(queue->getName(),
- alternateExchange ? alternateExchange->getName() : "",
- queue->isDurable(),
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
queue->getSettings(),
@@ -272,7 +267,7 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
+ bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
{
Queue::shared_ptr queue;
@@ -359,16 +354,16 @@ void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool
exclusiveQueues.end(),
queue);
if (i < exclusiveQueues.end()) exclusiveQueues.erase(i);
- }
+ }
}
-
+
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
{
getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
-}
+}
-SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
@@ -405,7 +400,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
AclModule* acl = getBroker().getAcl();
if (acl)
- {
+ {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
@@ -418,8 +413,8 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
<< queue->getName()));
- state.consume(destination, queue,
- acceptMode == 0, acquireMode == 0, exclusive,
+ state.consume(destination, queue,
+ acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -458,9 +453,9 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, ui
//unknown
throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
}
-
+
}
-
+
void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode)
{
if (mode == 0) {
@@ -470,18 +465,18 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat
//window
state.setWindowMode(destination);
} else{
- throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
}
}
-
+
void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
{
- state.flush(destination);
+ state.flush(destination);
}
void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
{
- state.stop(destination);
+ state.stop(destination);
}
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
@@ -509,7 +504,7 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st
{
throw NotImplementedException("resuming transfers not yet supported");
}
-
+
void SessionAdapter::ExecutionHandlerImpl::sync()
@@ -549,7 +544,7 @@ void SessionAdapter::TxHandlerImpl::commit()
}
void SessionAdapter::TxHandlerImpl::rollback()
-{
+{
state.rollback();
}
@@ -586,7 +581,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
return XaResult(XA_STATUS_XA_OK);
}
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -605,7 +600,7 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
}
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -615,7 +610,7 @@ XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
bool ok = getBroker().getDtxManager().prepare(convert(xid));
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -626,7 +621,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -637,14 +632,14 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
getBroker().getDtxManager().rollback(convert(xid));
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
+ getBroker().getStore().collectPreparedXids(xids);
/*
* create array of long structs
*/
@@ -665,7 +660,7 @@ void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return DtxGetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}