diff options
author | Alan Conway <aconway@apache.org> | 2012-01-18 22:08:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-18 22:08:38 +0000 |
commit | 965369da82675cdba83d00600cf352d4c157c9b6 (patch) | |
tree | 9aac7137c7672b9edb1a4dd50706865a8518d3f8 | |
parent | fac4ce5feb34885051ff8ec08527875ca5583af7 (diff) | |
download | qpid-python-965369da82675cdba83d00600cf352d4c157c9b6.tar.gz |
QPID-3603: Fix spurious "exchange not found" error and debug messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1233085 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueBindings.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 95 |
4 files changed, 58 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index d68845062d..5d763bf0da 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -279,12 +279,10 @@ uint32_t Exchange::encodedSize() const void Exchange::recoveryComplete(ExchangeRegistry& exchanges) { if (!alternateName.empty()) { - try { - Exchange::shared_ptr ae = exchanges.get(alternateName); - setAlternate(ae); - } catch (const NotFoundException&) { - QPID_LOG(warning, "Could not set alternate exchange \"" << alternateName << "\": does not exist."); - } + Exchange::shared_ptr ae = exchanges.find(alternateName); + if (ae) setAlternate(ae); + else QPID_LOG(warning, "Could not set alternate exchange \"" + << alternateName << "\": does not exist."); } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f87041390b..6334524612 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -1348,12 +1348,11 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange if (!alternateExchangeName.empty()) { - try { - Exchange::shared_ptr ae = exchanges.get(alternateExchangeName); - setAlternateExchange(ae); - } catch (const NotFoundException&) { - QPID_LOG(warning, "Could not set alternate exchange \"" << alternateExchangeName << "\" on queue \"" << name << "\": exchange does not exist."); - } + Exchange::shared_ptr ae = exchanges.find(alternateExchangeName); + if (ae) setAlternateExchange(ae); + else QPID_LOG(warning, "Could not set alternate exchange \"" + << alternateExchangeName << "\" on queue \"" << name + << "\": exchange does not exist."); } //process any pending dequeues for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp index c6b3ddf9a9..1cc3486d9a 100644 --- a/cpp/src/qpid/broker/QueueBindings.cpp +++ b/cpp/src/qpid/broker/QueueBindings.cpp @@ -42,10 +42,10 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) local = bindings; } - for (Bindings::iterator i = local.begin(); i != local.end(); i++) - try { - exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args)); - } catch (const NotFoundException&) {} + for (Bindings::iterator i = local.begin(); i != local.end(); i++) { + Exchange::shared_ptr ex = exchanges.find(i->exchange); + if (ex) ex->unbind(queue, i->key, &(i->args)); + } } QueueBinding::QueueBinding(const string& _exchange, const string& _key, const FieldTable& _args) diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 63c4b660b2..c50bf10f7b 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -61,8 +61,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : static const std::string _TRUE("true"); static const std::string _FALSE("false"); -void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, - const string& alternateExchange, +void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, + const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ //TODO: implement autoDelete @@ -132,10 +132,10 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex || !exchange->getAlternate())) throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>") - << ", requested " + << ", requested " << alternate->getName())); } - + void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) { //TODO: implement if-unused @@ -149,23 +149,21 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId())); } - - try { - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); + Exchange::shared_ptr exchange(getBroker().getExchanges().find(name)); + if (exchange) return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); - } catch (const NotFoundException& /*e*/) { - return ExchangeQueryResult("", false, true, FieldTable()); - } + else + return ExchangeQueryResult("", false, true, FieldTable()); } -void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, - const string& exchangeName, const string& routingKey, +void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, + const string& exchangeName, const string& routingKey, const FieldTable& arguments) { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); } - + void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) @@ -187,11 +185,8 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId())); } - - Exchange::shared_ptr exchange; - try { - exchange = getBroker().getExchanges().get(exchangeName); - } catch (const NotFoundException&) {} + + Exchange::shared_ptr exchange = getBroker().getExchanges().find(exchangeName); Queue::shared_ptr queue; if (!queueName.empty()) { @@ -238,10 +233,10 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() exclusiveQueues.erase(exclusiveQueues.begin()); } } - -bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const -{ - return session.isLocal(t); + +bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const +{ + return session.isLocal(t); } @@ -252,15 +247,15 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId())); } - + Queue::shared_ptr queue = session.getBroker().getQueues().find(name); if (queue) { Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - - return QueueQueryResult(queue->getName(), - alternateExchange ? alternateExchange->getName() : "", - queue->isDurable(), + + return QueueQueryResult(queue->getName(), + alternateExchange ? alternateExchange->getName() : "", + queue->isDurable(), queue->hasExclusiveOwner(), queue->isAutoDelete(), queue->getSettings(), @@ -272,7 +267,7 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) } void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, + bool passive, bool durable, bool exclusive, bool autoDelete, const qpid::framing::FieldTable& arguments) { Queue::shared_ptr queue; @@ -359,16 +354,16 @@ void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool exclusiveQueues.end(), queue); if (i < exclusiveQueues.end()) exclusiveQueues.erase(i); - } + } } - + void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty) { getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(), boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty)); -} +} -SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : +SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerHelper(s), releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), @@ -405,7 +400,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, AclModule* acl = getBroker().getAcl(); if (acl) - { + { if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } @@ -418,8 +413,8 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " << queue->getName())); - state.consume(destination, queue, - acceptMode == 0, acquireMode == 0, exclusive, + state.consume(destination, queue, + acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); ManagementAgent* agent = getBroker().getManagementAgent(); @@ -458,9 +453,9 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, ui //unknown throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit)); } - + } - + void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode) { if (mode == 0) { @@ -470,18 +465,18 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat //window state.setWindowMode(destination); } else{ - throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); + throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); } } - + void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination) { - state.flush(destination); + state.flush(destination); } void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) { - state.stop(destination); + state.stop(destination); } void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) @@ -509,7 +504,7 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st { throw NotImplementedException("resuming transfers not yet supported"); } - + void SessionAdapter::ExecutionHandlerImpl::sync() @@ -549,7 +544,7 @@ void SessionAdapter::TxHandlerImpl::commit() } void SessionAdapter::TxHandlerImpl::rollback() -{ +{ state.rollback(); } @@ -586,7 +581,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, return XaResult(XA_STATUS_XA_OK); } } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -605,7 +600,7 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, } return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -615,7 +610,7 @@ XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) bool ok = getBroker().getDtxManager().prepare(convert(xid)); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -626,7 +621,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -637,14 +632,14 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) getBroker().getDtxManager().rollback(convert(xid)); return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); + getBroker().getStore().collectPreparedXids(xids); /* * create array of long structs */ @@ -665,7 +660,7 @@ void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return DtxGetTimeoutResult(timeout); + return DtxGetTimeoutResult(timeout); } |