diff options
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionOutputException.h | 47 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 1 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 6 | ||||
-rw-r--r-- | qpid/python/tests_0-10/exchange.py | 4 |
16 files changed, 142 insertions, 2 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 7c00a73a47..d0aae54be0 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -610,6 +610,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionManager.cpp \ qpid/broker/SessionManager.h \ qpid/broker/SessionManager.h \ + qpid/broker/SessionOutputException.h \ qpid/broker/SessionState.cpp \ qpid/broker/SessionState.h \ qpid/broker/SignalHandler.cpp \ diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 5f97d292bc..2448e9ef26 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -119,6 +119,16 @@ void SessionHandler::handleIn(AMQFrame& f) { } } +void SessionHandler::handleException(const qpid::SessionException& e) +{ + QPID_LOG(error, "Execution exception (during output): " << e.what()); + executionException(e.code, e.what()); // Let subclass handle this first. + framing::AMQP_AllProxy::Execution execution(channel); + execution.exception(e.code, 0, 0, 0, 0, e.what(), FieldTable()); + detaching(); + sendDetach(); +} + namespace { bool isControl(const AMQFrame& f) { return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL; diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index fa6e6f4af6..a87a1a155f 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -30,6 +30,7 @@ namespace qpid { +struct SessionException; namespace amqp_0_10 { @@ -62,6 +63,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, QPID_COMMON_EXTERN void sendTimeout(uint32_t t); QPID_COMMON_EXTERN void sendFlush(); QPID_COMMON_EXTERN void markReadyToSend();//TODO: only needed for inter-broker bridge; cleanup + QPID_COMMON_EXTERN void handleException(const qpid::SessionException& e); /** True if the handler is ready to send and receive */ bool ready() const; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 725ceee084..c3388a4ab1 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Connection.h" +#include "qpid/broker/SessionOutputException.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" @@ -278,6 +279,9 @@ bool Connection::doOutput() { //then do other output as needed: return outputTasks.doOutput(); } + }catch(const SessionOutputException& e){ + getChannel(e.channel).handleException(e); + return true; }catch(ConnectionException& e){ close(e.code, e.getMessage()); }catch(std::exception& e){ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3eb714186c..6e813e936d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -101,7 +101,8 @@ Queue::Queue(const string& _name, bool _autodelete, eventMode(0), eventMgr(0), insertSeqNo(0), - broker(b) + broker(b), + deleted(false) { if (parent != 0 && broker != 0) { @@ -291,6 +292,7 @@ void Queue::notifyListener() bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { + checkNotDeleted(); if (c->preAcquires()) { switch (consumeNextMessage(m, c)) { case CONSUMED: @@ -869,6 +871,17 @@ void Queue::destroy() } } +void Queue::notifyDeleted() +{ + QueueListeners::ListenerSet set; + { + Mutex::ScopedLock locker(messageLock); + listeners.snapshot(set); + deleted = true; + } + set.notifyAll(); +} + void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { @@ -1102,3 +1115,10 @@ bool Queue::isEnqueued(const QueuedMessage& msg) } QueueListeners& Queue::getListeners() { return listeners; } + +void Queue::checkNotDeleted() +{ + if (deleted) { + throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); + } +} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index cac8956bf5..513403d535 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -107,6 +107,7 @@ namespace qpid { bool insertSeqNo; std::string seqNoKey; Broker* broker; + bool deleted; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -150,6 +151,7 @@ namespace qpid { } Messages::iterator findAt(framing::SequenceNumber pos); + void checkNotDeleted(); public: @@ -173,6 +175,7 @@ namespace qpid { QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, bool recovering = false); void destroy(); + void notifyDeleted(); QPID_BROKER_EXTERN void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp index 951de2184a..4d2c57d6b4 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp +++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp @@ -78,4 +78,15 @@ bool QueueListeners::contains(Consumer::shared_ptr c) const { std::find(consumers.begin(), consumers.end(), c) != consumers.end(); } +void QueueListeners::ListenerSet::notifyAll() +{ + std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify)); +} + +void QueueListeners::snapshot(ListenerSet& set) +{ + set.listeners.insert(set.listeners.end(), consumers.begin(), consumers.end()); + set.listeners.insert(set.listeners.end(), browsers.begin(), browsers.end()); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h index 51ef58eb06..59d1c84ca4 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.h +++ b/qpid/cpp/src/qpid/broker/QueueListeners.h @@ -52,10 +52,21 @@ class QueueListeners friend class QueueListeners; }; + class ListenerSet + { + public: + void notifyAll(); + private: + Listeners listeners; + friend class QueueListeners; + }; + void addListener(Consumer::shared_ptr); void removeListener(Consumer::shared_ptr); void populate(NotificationSet&); + void snapshot(ListenerSet&); bool contains(Consumer::shared_ptr c) const; + void notifyAll(); template <class F> void eachListener(F f) { std::for_each(browsers.begin(), browsers.end(), f); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 68c62a72ef..73ef807a0a 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" +#include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/framing/reply_exceptions.h" @@ -671,7 +672,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) bool SemanticState::ConsumerImpl::doOutput() { - return haveCredit() && queue->dispatch(shared_from_this()); + try { + return haveCredit() && queue->dispatch(shared_from_this()); + } catch (const SessionException& e) { + throw SessionOutputException(e, parent->session.getChannel()); + } } void SemanticState::ConsumerImpl::enableNotify() diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index a7743d95ab..c3b6f697fd 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -439,6 +439,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); + q->notifyDeleted(); } } diff --git a/qpid/cpp/src/qpid/broker/SessionOutputException.h b/qpid/cpp/src/qpid/broker/SessionOutputException.h new file mode 100644 index 0000000000..7c1c5de926 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SessionOutputException.h @@ -0,0 +1,47 @@ +#ifndef QPID_BROKER_SESSIONOUTPUTEXCEPTION_H +#define QPID_BROKER_SESSIONOUTPUTEXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +/** + * This exception is used to signal 'session' exceptions (aka + * execution exceptions in AMQP 0-10 terms) that occur during output + * processing. It simply allows the channel of the session to be + * specified in addition to the other details. Special treatment is + * required at present because the output processing chain is + * different from that which handles incoming commands (specifically + * AggregateOutput cannot reasonably handle exceptions as it has no + * context). + */ +struct SessionOutputException : qpid::SessionException +{ + const uint16_t channel; + SessionOutputException(const qpid::SessionException& e, uint16_t c) : qpid::SessionException(e.code, e.getMessage()), channel(c) {} +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SESSIONOUTPUTEXCEPTION_H*/ diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 0f767c9f2e..34589d59fc 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -558,6 +558,7 @@ void SessionImpl::detach(const std::string& _name) setState(DETACHED); QPID_LOG(info, "Session detached by peer: " << id); proxy.detached(_name, DETACH_CODE_NORMAL); + handleClosed(); } void SessionImpl::detached(const std::string& _name, uint8_t _code) { diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 8ce5d85632..e8cdb1f232 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -629,6 +629,19 @@ QPID_AUTO_TEST_CASE(testGetConnectionFromSession) { BOOST_CHECK(!fix.subs.get(got, "a")); } + +QPID_AUTO_TEST_CASE(testQueueDeleted) +{ + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue"); + LocalQueue queue; + fix.subs.subscribe(queue, "my-queue"); + + ScopedSuppressLogging sl; + fix.session.queueDelete(arg::queue="my-queue"); + BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 4125c51698..d48512e0df 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -180,6 +180,7 @@ struct MultiQueueFixture : MessagingFixture ~MultiQueueFixture() { + connection.close(); for (const_iterator i = queues.begin(); i != queues.end(); ++i) { admin.deleteQueue(*i); } diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 9a8a4c807c..683bb574ee 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -239,6 +239,12 @@ class SessionTests(Base): pass assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) self.ssn.acknowledge() + #we set the capacity to 0 to prevent the deletion of the queue - + #triggered the deletion policy when the first receiver is closed - + #resulting in session exceptions being issued for the remaining + #active subscriptions: + for r in [rcv1, rcv2, rcv3]: + r.capacity = 0 # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown diff --git a/qpid/python/tests_0-10/exchange.py b/qpid/python/tests_0-10/exchange.py index 8d9713076d..0ac78a4799 100644 --- a/qpid/python/tests_0-10/exchange.py +++ b/qpid/python/tests_0-10/exchange.py @@ -35,9 +35,12 @@ class TestHelper(TestBase010): TestBase010.setUp(self) self.queues = [] self.exchanges = [] + self.subscriptions = [] def tearDown(self): try: + for s in self.subscriptions: + self.session.message_cancel(destination=s) for ssn, q in self.queues: ssn.queue_delete(queue=q) for ssn, ex in self.exchanges: @@ -110,6 +113,7 @@ class TestHelper(TestBase010): self.session.message_subscribe(queue=queueName, destination=consumer_tag) self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL) self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) + self.subscriptions.append(consumer_tag) return self.session.incoming(consumer_tag) |