summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp28
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp34
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h17
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp51
4 files changed, 84 insertions, 46 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 6acd0a3ced..42eceaf9f6 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!get(result, timeout)) throw NoMessageAvailable();
return result;
}
-
-qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!fetch(result, timeout)) throw NoMessageAvailable();
@@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur
return f.result;
}
-void ReceiverImpl::close()
-{
+void ReceiverImpl::close()
+{
execute<Close>();
}
@@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled()
return parent->getUnsettledAcks(destination);
}
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
}
}
-void ReceiverImpl::closeImpl()
-{
+void ReceiverImpl::closeImpl()
+{
sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
+ session.messageStop(destination);
+ parent->releasePending(destination);
source->cancel(session, destination);
parent->receiverCancelled(destination);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 6d98527627..75a71997fd 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
-
+
CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createReceiverImpl(address); }
@@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
-
+
CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createSenderImpl(address); }
@@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const
throw KeyError(name);
} else {
return i->second;
- }
+ }
}
Receiver SessionImpl::getReceiver(const std::string& name) const
@@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT
}
}
-bool SessionImpl::accept(ReceiverImpl* receiver,
- qpid::messaging::Message* message,
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
IncomingMessages::MessageTransfer& transfer)
{
if (receiver->getName() == transfer.getDestination()) {
@@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command
{
const std::string* destination;
uint32_t result;
-
+
Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getReceivableImpl(destination); }
};
@@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command
{
const std::string* destination;
uint32_t result;
-
+
UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getUnsettledAcksImpl(destination); }
};
@@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl()
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
//ensure that stop has been processed and all previously sent
- //messages are available for release:
+ //messages are available for release:
session.sync();
incoming.releaseAll();
- session.txRollback();
+ session.txRollback();
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
@@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name)
incoming.releasePending(name);
}
+void SessionImpl::releasePending(const std::string& name)
+{
+ ScopedLock l(lock);
+ incoming.releasePending(name);
+}
+
void SessionImpl::senderCancelled(const std::string& name)
{
ScopedLock l(lock);
@@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->open();
+ connection->open();
}
bool SessionImpl::backoff()
{
- return connection->backoff();
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 3dd5cd0189..2a2aa47df6 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl
void checkError();
bool hasError();
- bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void releasePending(const std::string& destination);
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
@@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
@@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
-
+
Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()() { impl.acknowledgeImpl(message); }
};
-
+
struct CreateSender;
struct CreateReceiver;
struct UnsettledAcks;
@@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this);
return execute(f);
}
-
+
template <class F> void retry()
{
while (!execute<F>()) {}
}
-
+
template <class F, class P> bool execute1(P p)
{
F f(*this, p);
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index fc1632b4e1..991ec847bf 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification)
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber)
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
+}
+
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+ MessagingFixture fix;
+ const uint capacity = 5;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+ Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+ Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+ receiver1.setCapacity(capacity);
+ receiver2.setCapacity(capacity*2);
+
+ Message out("test-message");
+ for (uint i = 0; i < capacity*2; ++i) {
+ sender.send(out);
+ }
+
+ receiver1.close();
+
+ // Make sure all pending messages were sent to the alternate
+ // exchange when the queue was deleted.
+ Message in;
+ for (uint i = 0; i < capacity*2; ++i) {
+ in = receiver2.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
}
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
@@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());