diff options
author | Jonathan Robie <jonathan@apache.org> | 2011-02-07 18:21:24 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2011-02-07 18:21:24 +0000 |
commit | eb8e7cf77d382f4233d01cd6c4f96acb3c68f390 (patch) | |
tree | 1edda53e532d03b4f2eaf24d7a0e62f4c7dda710 | |
parent | a23886065e02cf0ea86dc62b2b752ebf35d1220b (diff) | |
download | qpid-python-eb8e7cf77d382f4233d01cd6c4f96acb3c68f390.tar.gz |
Ensures that messages acquired, but not acked, are released before a queue is deleted.
Otherwise, these messages are not routed to an alternate exchange, and the queue is not actually deleted.
Resolves QPID-3040 / Red Hat BZ 674678.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1068042 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 108 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 58 |
2 files changed, 105 insertions, 61 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 3d62e73185..1355478f27 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -60,8 +60,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : static const std::string _TRUE("true"); static const std::string _FALSE("false"); -void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, - const string& alternateExchange, +void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, + const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ AclModule* acl = getBroker().getAcl(); @@ -74,7 +74,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId())); } - + //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { @@ -84,7 +84,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); checkAlternate(actual, alternate); - }else{ + }else{ if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) { throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")")); } @@ -128,10 +128,10 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex || !exchange->getAlternate())) throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>") - << ", requested " + << ", requested " << alternate->getName())); } - + void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) { AclModule* acl = getBroker().getAcl(); @@ -164,12 +164,12 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const NotFoundException& /*e*/) { - return ExchangeQueryResult("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } -void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, - const string& exchangeName, const string& routingKey, +void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, + const string& exchangeName, const string& routingKey, const FieldTable& arguments) { AclModule* acl = getBroker().getAcl(); @@ -201,7 +201,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, throw NotFoundException("Bind failed. No such exchange: " + exchangeName); } } - + void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) @@ -245,7 +245,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId())); } - + Exchange::shared_ptr exchange; try { exchange = getBroker().getExchanges().get(exchangeName); @@ -296,10 +296,10 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() exclusiveQueues.erase(exclusiveQueues.begin()); } } - -bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const -{ - return session.isLocal(t); + +bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const +{ + return session.isLocal(t); } @@ -310,15 +310,15 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId())); } - + Queue::shared_ptr queue = session.getBroker().getQueues().find(name); if (queue) { Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - - return QueueQueryResult(queue->getName(), - alternateExchange ? alternateExchange->getName() : "", - queue->isDurable(), + + return QueueQueryResult(queue->getName(), + alternateExchange ? alternateExchange->getName() : "", + queue->isDurable(), queue->hasExclusiveOwner(), queue->isAutoDelete(), queue->getSettings(), @@ -330,9 +330,9 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) } void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, + bool passive, bool durable, bool exclusive, bool autoDelete, const qpid::framing::FieldTable& arguments) -{ +{ AclModule* acl = getBroker().getAcl(); if (acl) { std::map<acl::Property, std::string> params; @@ -358,7 +358,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { - std::pair<Queue::shared_ptr, bool> queue_created = + std::pair<Queue::shared_ptr, bool> queue_created = getBroker().getQueues().declare(name, durable, autoDelete, exclusive ? &session : 0); @@ -395,12 +395,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& queue_created.second ? "created" : "existing")); } - if (exclusive && !queue->isExclusiveOwner(&session)) + if (exclusive && !queue->isExclusiveOwner(&session)) throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); -} - - +} + + void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ AclModule* acl = getBroker().getAcl(); if (acl) @@ -409,8 +409,8 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); } getQueue(queue)->purge(); -} - +} + void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ AclModule* acl = getBroker().getAcl(); @@ -421,7 +421,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse } Queue::shared_ptr q = getQueue(queue); - if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) + if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) throw ResourceLockedException(QPID_MSG("Cannot delete queue " << queue << "; it is exclusive to another session")); if(ifEmpty && q->getMessageCount() > 0){ @@ -434,6 +434,14 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); } + + DeliveryRecords::iterator i; + for (i=state.getUnacked().begin(); i != state.getUnacked().end(); ++i) { + if (i->getQueue() == q) { + i->release(true); + } + } + q->destroy(); getBroker().getQueues().destroy(queue); q->unbind(getBroker().getExchanges(), q); @@ -443,9 +451,9 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); q->notifyDeleted(); } -} +} -SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : +SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerHelper(s), releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), @@ -482,7 +490,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, AclModule* acl = getBroker().getAcl(); if (acl) - { + { if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } @@ -495,8 +503,8 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " << queue->getName())); - state.consume(destination, queue, - acceptMode == 0, acquireMode == 0, exclusive, + state.consume(destination, queue, + acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); ManagementAgent* agent = getBroker().getManagementAgent(); @@ -533,9 +541,9 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, ui //unknown throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit)); } - + } - + void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode) { if (mode == 0) { @@ -545,18 +553,18 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat //window state.setWindowMode(destination); } else{ - throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); + throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); } } - + void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination) { - state.flush(destination); + state.flush(destination); } void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) { - state.stop(destination); + state.stop(destination); } void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) @@ -584,7 +592,7 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st { throw NotImplementedException("resuming transfers not yet supported"); } - + void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op @@ -619,7 +627,7 @@ void SessionAdapter::TxHandlerImpl::commit() } void SessionAdapter::TxHandlerImpl::rollback() -{ +{ state.rollback(); } @@ -656,7 +664,7 @@ XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, return XaResult(XA_STATUS_XA_OK); } } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -675,7 +683,7 @@ XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, } return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -685,7 +693,7 @@ XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) bool ok = getBroker().getDtxManager().prepare(convert(xid)); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -696,7 +704,7 @@ XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } @@ -707,14 +715,14 @@ XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) getBroker().getDtxManager().rollback(convert(xid)); return XaResult(XA_STATUS_XA_OK); } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); + return XaResult(XA_STATUS_XA_RBTIMEOUT); } } DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); + getBroker().getStore().collectPreparedXids(xids); /* * create array of long structs */ @@ -735,7 +743,7 @@ void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return DtxGetTimeoutResult(timeout); + return DtxGetTimeoutResult(timeout); } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fc1632b4e1..5dfb0f38bb 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture ~QueueCreatePolicyFixture() { - admin.deleteQueue(address.getName()); + admin.deleteQueue(address.getName()); } }; @@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture ~ExchangeCreatePolicyFixture() { - admin.deleteExchange(address.getName()); + admin.deleteExchange(address.getName()); } }; @@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); @@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification) { MessagingFixture fix; fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); + BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) @@ -775,21 +775,57 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber) QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) { MessagingFixture fix; - + std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; std::string browseAddress = "exclusive-queue; { mode: browse }"; Receiver receiver = fix.session.createReceiver(address); fix.session.sync(); - Connection c2 = fix.newConnection(); + Connection c2 = fix.newConnection(); c2.open(); Session s2 = c2.createSession(); - + BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); + c2.close(); } + +QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) +{ + MessagingFixture fix; + const uint capacity = 5; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); + Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); + + receiver1.setCapacity(capacity); + receiver2.setCapacity(2*capacity); + + Message out("test-message"); + for (uint i = 0; i < capacity*2; ++i) { + sender.send(out); + } + + // Read half the messages, do not acknowledge + Message in; + for (uint i = 0; i < capacity; ++i) { + in = receiver1.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } + + receiver1.close(); + + // Make sure all unacked messages were sent to the alternate + // exchange when the queue was deleted. + for (uint i = 0; i < capacity*2; ++i) { + in = receiver2.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } +} + + QPID_AUTO_TEST_CASE(testAuthenticatedUsername) { MessagingFixture fix; @@ -828,7 +864,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) messages.push_back(msg); } const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { + for (uint i = 0; i < batch; ++i) { other.acknowledge(messages[i]); } messages.clear(); @@ -836,7 +872,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) other.close(); other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < (count-batch); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); @@ -847,7 +883,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) //check unacknowledged messages are still enqueued other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < ((count-batch)/2); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); |