summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-01-22 10:58:20 +0000
committerGordon Sim <gsim@apache.org>2010-01-22 10:58:20 +0000
commit94414b346e8423c7d2ced919a022867e0217fbaf (patch)
treecc33eba0e3faccba8bd910739659a41d5f162403
parentfa62d82fe2dfc127440de4675803d1a279807722 (diff)
downloadqpid-python-94414b346e8423c7d2ced919a022867e0217fbaf.tar.gz
QPID-2347: Signal deletion of queue to active subscribers via a resource-deleted exception.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@902055 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp10
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.h11
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionOutputException.h47
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp1
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp13
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp1
-rw-r--r--qpid/python/qpid/tests/messaging.py6
-rw-r--r--qpid/python/tests_0-10/exchange.py4
16 files changed, 142 insertions, 2 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 7c00a73a47..d0aae54be0 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -610,6 +610,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionManager.cpp \
qpid/broker/SessionManager.h \
qpid/broker/SessionManager.h \
+ qpid/broker/SessionOutputException.h \
qpid/broker/SessionState.cpp \
qpid/broker/SessionState.h \
qpid/broker/SignalHandler.cpp \
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 5f97d292bc..2448e9ef26 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -119,6 +119,16 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
+void SessionHandler::handleException(const qpid::SessionException& e)
+{
+ QPID_LOG(error, "Execution exception (during output): " << e.what());
+ executionException(e.code, e.what()); // Let subclass handle this first.
+ framing::AMQP_AllProxy::Execution execution(channel);
+ execution.exception(e.code, 0, 0, 0, 0, e.what(), FieldTable());
+ detaching();
+ sendDetach();
+}
+
namespace {
bool isControl(const AMQFrame& f) {
return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index fa6e6f4af6..a87a1a155f 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -30,6 +30,7 @@
namespace qpid {
+struct SessionException;
namespace amqp_0_10 {
@@ -62,6 +63,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
QPID_COMMON_EXTERN void sendTimeout(uint32_t t);
QPID_COMMON_EXTERN void sendFlush();
QPID_COMMON_EXTERN void markReadyToSend();//TODO: only needed for inter-broker bridge; cleanup
+ QPID_COMMON_EXTERN void handleException(const qpid::SessionException& e);
/** True if the handler is ready to send and receive */
bool ready() const;
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 725ceee084..c3388a4ab1 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Connection.h"
+#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
@@ -278,6 +279,9 @@ bool Connection::doOutput() {
//then do other output as needed:
return outputTasks.doOutput();
}
+ }catch(const SessionOutputException& e){
+ getChannel(e.channel).handleException(e);
+ return true;
}catch(ConnectionException& e){
close(e.code, e.getMessage());
}catch(std::exception& e){
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3eb714186c..6e813e936d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -101,7 +101,8 @@ Queue::Queue(const string& _name, bool _autodelete,
eventMode(0),
eventMgr(0),
insertSeqNo(0),
- broker(b)
+ broker(b),
+ deleted(false)
{
if (parent != 0 && broker != 0)
{
@@ -291,6 +292,7 @@ void Queue::notifyListener()
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
+ checkNotDeleted();
if (c->preAcquires()) {
switch (consumeNextMessage(m, c)) {
case CONSUMED:
@@ -869,6 +871,17 @@ void Queue::destroy()
}
}
+void Queue::notifyDeleted()
+{
+ QueueListeners::ListenerSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.snapshot(set);
+ deleted = true;
+ }
+ set.notifyAll();
+}
+
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
@@ -1102,3 +1115,10 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
}
QueueListeners& Queue::getListeners() { return listeners; }
+
+void Queue::checkNotDeleted()
+{
+ if (deleted) {
+ throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
+ }
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index cac8956bf5..513403d535 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -107,6 +107,7 @@ namespace qpid {
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
+ bool deleted;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -150,6 +151,7 @@ namespace qpid {
}
Messages::iterator findAt(framing::SequenceNumber pos);
+ void checkNotDeleted();
public:
@@ -173,6 +175,7 @@ namespace qpid {
QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
bool recovering = false);
void destroy();
+ void notifyDeleted();
QPID_BROKER_EXTERN void bound(const string& exchange,
const string& key,
const qpid::framing::FieldTable& args);
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
index 951de2184a..4d2c57d6b4 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
@@ -78,4 +78,15 @@ bool QueueListeners::contains(Consumer::shared_ptr c) const {
std::find(consumers.begin(), consumers.end(), c) != consumers.end();
}
+void QueueListeners::ListenerSet::notifyAll()
+{
+ std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify));
+}
+
+void QueueListeners::snapshot(ListenerSet& set)
+{
+ set.listeners.insert(set.listeners.end(), consumers.begin(), consumers.end());
+ set.listeners.insert(set.listeners.end(), browsers.begin(), browsers.end());
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h
index 51ef58eb06..59d1c84ca4 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.h
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.h
@@ -52,10 +52,21 @@ class QueueListeners
friend class QueueListeners;
};
+ class ListenerSet
+ {
+ public:
+ void notifyAll();
+ private:
+ Listeners listeners;
+ friend class QueueListeners;
+ };
+
void addListener(Consumer::shared_ptr);
void removeListener(Consumer::shared_ptr);
void populate(NotificationSet&);
+ void snapshot(ListenerSet&);
bool contains(Consumer::shared_ptr c) const;
+ void notifyAll();
template <class F> void eachListener(F f) {
std::for_each(browsers.begin(), browsers.end(), f);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 68c62a72ef..73ef807a0a 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
@@ -671,7 +672,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
bool SemanticState::ConsumerImpl::doOutput()
{
- return haveCredit() && queue->dispatch(shared_from_this());
+ try {
+ return haveCredit() && queue->dispatch(shared_from_this());
+ } catch (const SessionException& e) {
+ throw SessionOutputException(e, parent->session.getChannel());
+ }
}
void SemanticState::ConsumerImpl::enableNotify()
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index a7743d95ab..c3b6f697fd 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -439,6 +439,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
+ q->notifyDeleted();
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionOutputException.h b/qpid/cpp/src/qpid/broker/SessionOutputException.h
new file mode 100644
index 0000000000..7c1c5de926
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionOutputException.h
@@ -0,0 +1,47 @@
+#ifndef QPID_BROKER_SESSIONOUTPUTEXCEPTION_H
+#define QPID_BROKER_SESSIONOUTPUTEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * This exception is used to signal 'session' exceptions (aka
+ * execution exceptions in AMQP 0-10 terms) that occur during output
+ * processing. It simply allows the channel of the session to be
+ * specified in addition to the other details. Special treatment is
+ * required at present because the output processing chain is
+ * different from that which handles incoming commands (specifically
+ * AggregateOutput cannot reasonably handle exceptions as it has no
+ * context).
+ */
+struct SessionOutputException : qpid::SessionException
+{
+ const uint16_t channel;
+ SessionOutputException(const qpid::SessionException& e, uint16_t c) : qpid::SessionException(e.code, e.getMessage()), channel(c) {}
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONOUTPUTEXCEPTION_H*/
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 0f767c9f2e..34589d59fc 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -558,6 +558,7 @@ void SessionImpl::detach(const std::string& _name)
setState(DETACHED);
QPID_LOG(info, "Session detached by peer: " << id);
proxy.detached(_name, DETACH_CODE_NORMAL);
+ handleClosed();
}
void SessionImpl::detached(const std::string& _name, uint8_t _code) {
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 8ce5d85632..e8cdb1f232 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -629,6 +629,19 @@ QPID_AUTO_TEST_CASE(testGetConnectionFromSession) {
BOOST_CHECK(!fix.subs.get(got, "a"));
}
+
+QPID_AUTO_TEST_CASE(testQueueDeleted)
+{
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(arg::queue="my-queue");
+ LocalQueue queue;
+ fix.subs.subscribe(queue, "my-queue");
+
+ ScopedSuppressLogging sl;
+ fix.session.queueDelete(arg::queue="my-queue");
+ BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 4125c51698..d48512e0df 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -180,6 +180,7 @@ struct MultiQueueFixture : MessagingFixture
~MultiQueueFixture()
{
+ connection.close();
for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
admin.deleteQueue(*i);
}
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 9a8a4c807c..683bb574ee 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -239,6 +239,12 @@ class SessionTests(Base):
pass
assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
self.ssn.acknowledge()
+ #we set the capacity to 0 to prevent the deletion of the queue -
+ #triggered the deletion policy when the first receiver is closed -
+ #resulting in session exceptions being issued for the remaining
+ #active subscriptions:
+ for r in [rcv1, rcv2, rcv3]:
+ r.capacity = 0
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
diff --git a/qpid/python/tests_0-10/exchange.py b/qpid/python/tests_0-10/exchange.py
index 8d9713076d..0ac78a4799 100644
--- a/qpid/python/tests_0-10/exchange.py
+++ b/qpid/python/tests_0-10/exchange.py
@@ -35,9 +35,12 @@ class TestHelper(TestBase010):
TestBase010.setUp(self)
self.queues = []
self.exchanges = []
+ self.subscriptions = []
def tearDown(self):
try:
+ for s in self.subscriptions:
+ self.session.message_cancel(destination=s)
for ssn, q in self.queues:
ssn.queue_delete(queue=q)
for ssn, ex in self.exchanges:
@@ -110,6 +113,7 @@ class TestHelper(TestBase010):
self.session.message_subscribe(queue=queueName, destination=consumer_tag)
self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+ self.subscriptions.append(consumer_tag)
return self.session.incoming(consumer_tag)