summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2011-02-09 18:58:12 +0000
committerJonathan Robie <jonathan@apache.org>2011-02-09 18:58:12 +0000
commit0c3f29c4d2667c51e3d00506ac7afdd3735931d2 (patch)
treed6846f5d6ebf885bd9a1252ff86e65828149dd19
parent70ffbc6ea611722f31e742b11f4ed547dec8d713 (diff)
downloadqpid-python-0c3f29c4d2667c51e3d00506ac7afdd3735931d2.tar.gz
QPID-3040: The C++ messaging client library now releases pending messages when a Receiver is closed.
This only releases messages in the client's cache that have not been read. It does not release messages that have been read by the client application, but not acknowledged. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1069030 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp28
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp34
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h17
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp51
4 files changed, 84 insertions, 46 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 6acd0a3ced..42eceaf9f6 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!get(result, timeout)) throw NoMessageAvailable();
return result;
}
-
-qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!fetch(result, timeout)) throw NoMessageAvailable();
@@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur
return f.result;
}
-void ReceiverImpl::close()
-{
+void ReceiverImpl::close()
+{
execute<Close>();
}
@@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled()
return parent->getUnsettledAcks(destination);
}
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
}
}
-void ReceiverImpl::closeImpl()
-{
+void ReceiverImpl::closeImpl()
+{
sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
+ session.messageStop(destination);
+ parent->releasePending(destination);
source->cancel(session, destination);
parent->receiverCancelled(destination);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 6d98527627..75a71997fd 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
-
+
CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createReceiverImpl(address); }
@@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
-
+
CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createSenderImpl(address); }
@@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const
throw KeyError(name);
} else {
return i->second;
- }
+ }
}
Receiver SessionImpl::getReceiver(const std::string& name) const
@@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT
}
}
-bool SessionImpl::accept(ReceiverImpl* receiver,
- qpid::messaging::Message* message,
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
IncomingMessages::MessageTransfer& transfer)
{
if (receiver->getName() == transfer.getDestination()) {
@@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command
{
const std::string* destination;
uint32_t result;
-
+
Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getReceivableImpl(destination); }
};
@@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command
{
const std::string* destination;
uint32_t result;
-
+
UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getUnsettledAcksImpl(destination); }
};
@@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl()
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
//ensure that stop has been processed and all previously sent
- //messages are available for release:
+ //messages are available for release:
session.sync();
incoming.releaseAll();
- session.txRollback();
+ session.txRollback();
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
@@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name)
incoming.releasePending(name);
}
+void SessionImpl::releasePending(const std::string& name)
+{
+ ScopedLock l(lock);
+ incoming.releasePending(name);
+}
+
void SessionImpl::senderCancelled(const std::string& name)
{
ScopedLock l(lock);
@@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->open();
+ connection->open();
}
bool SessionImpl::backoff()
{
- return connection->backoff();
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 3dd5cd0189..2a2aa47df6 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl
void checkError();
bool hasError();
- bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void releasePending(const std::string& destination);
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
@@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
@@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
-
+
Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()() { impl.acknowledgeImpl(message); }
};
-
+
struct CreateSender;
struct CreateReceiver;
struct UnsettledAcks;
@@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this);
return execute(f);
}
-
+
template <class F> void retry()
{
while (!execute<F>()) {}
}
-
+
template <class F, class P> bool execute1(P p)
{
F f(*this, p);
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index fc1632b4e1..991ec847bf 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification)
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber)
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
+}
+
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+ MessagingFixture fix;
+ const uint capacity = 5;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+ Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+ Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+ receiver1.setCapacity(capacity);
+ receiver2.setCapacity(capacity*2);
+
+ Message out("test-message");
+ for (uint i = 0; i < capacity*2; ++i) {
+ sender.send(out);
+ }
+
+ receiver1.close();
+
+ // Make sure all pending messages were sent to the alternate
+ // exchange when the queue was deleted.
+ Message in;
+ for (uint i = 0; i < capacity*2; ++i) {
+ in = receiver2.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
}
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
@@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());