summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-16 17:30:23 +0000
committerGordon Sim <gsim@apache.org>2009-11-16 17:30:23 +0000
commit771ac3e530a701120a933034f29659d16d5b4e85 (patch)
tree10216c075c1280ca13bfe8d946026ac26fbebc5b /cpp
parent9c3dda9cf4bc359e68587767dd2ab072cf4a1298 (diff)
downloadqpid-python-771ac3e530a701120a933034f29659d16d5b4e85.tar.gz
QPID-664: Remove start()/stop() methods from api
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880863 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/messaging/Receiver.h10
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp26
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h14
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp9
-rw-r--r--cpp/src/qpid/messaging/Receiver.cpp2
-rw-r--r--cpp/src/qpid/messaging/ReceiverImpl.h2
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp6
7 files changed, 15 insertions, 54 deletions
diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h
index 1d72e5fc49..659a583547 100644
--- a/cpp/include/qpid/messaging/Receiver.h
+++ b/cpp/include/qpid/messaging/Receiver.h
@@ -80,16 +80,6 @@ class Receiver : public qpid::client::Handle<ReceiverImpl>
* serving before throwing an exception.
*/
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
-
- /**
- * Enables the message flow for this receiver
- */
- QPID_CLIENT_EXTERN void start();
- /**
- * Stops the message flow for this receiver (but does not cancel
- * the subscription).
- */
- QPID_CLIENT_EXTERN void stop();
/**
* Sets the capacity for the receiver. The capacity determines how
* many incoming messages can be held in the receiver before being
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 83b245aa02..fbaff7ec04 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -74,12 +74,16 @@ void ReceiverImpl::cancel()
void ReceiverImpl::start()
{
- execute<Start>();
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow();
+ }
}
void ReceiverImpl::stop()
{
- execute<Stop>();
+ state = STOPPED;
+ session.messageStop(destination);
}
void ReceiverImpl::setCapacity(uint32_t c)
@@ -103,14 +107,14 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
session = s;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
- state = STOPPED;//TODO: if session is started, go straight to started
+ state = STARTED;
}
if (state == CANCELLED) {
source->cancel(session, destination);
parent.receiverCancelled(destination);
} else {
source->subscribe(session, destination);
- if (state == STARTED) start();
+ start();
}
}
@@ -171,20 +175,6 @@ void ReceiverImpl::cancelImpl()
}
}
-void ReceiverImpl::startImpl()
-{
- if (state == STOPPED) {
- state = STARTED;
- startFlow();
- }
-}
-
-void ReceiverImpl::stopImpl()
-{
- state = STOPPED;
- session.messageStop(destination);
-}
-
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
if (c != capacity) {
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 3a18368116..8033546c51 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -80,8 +80,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
//implementation of public facing methods
bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- void startImpl();
- void stopImpl();
void cancelImpl();
void setCapacityImpl(uint32_t);
@@ -116,18 +114,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void operator()() { result = impl.fetchImpl(message, timeout); }
};
- struct Stop : Command
- {
- Stop(ReceiverImpl& i) : Command(i) {}
- void operator()() { impl.stopImpl(); }
- };
-
- struct Start : Command
- {
- Start(ReceiverImpl& i) : Command(i) {}
- void operator()() { impl.startImpl(); }
- };
-
struct Cancel : Command
{
Cancel(ReceiverImpl& i) : Command(i) {}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index d0085dad75..4d40bd6c50 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -344,13 +344,18 @@ void SessionImpl::commitImpl()
void SessionImpl::rollbackImpl()
{
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop();
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
+ }
//ensure that stop has been processed and all previously sent
//messages are available for release:
session.sync();
incoming.releaseAll();
session.txRollback();
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start();
+
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
+ }
}
void SessionImpl::acknowledgeImpl()
diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp
index 76750cfc59..ac5d93319f 100644
--- a/cpp/src/qpid/messaging/Receiver.cpp
+++ b/cpp/src/qpid/messaging/Receiver.cpp
@@ -42,8 +42,6 @@ bool Receiver::get(Message& message, qpid::sys::Duration timeout) { return impl-
Message Receiver::get(qpid::sys::Duration timeout) { return impl->get(timeout); }
bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return impl->fetch(message, timeout); }
Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeout); }
-void Receiver::start() { impl->start(); }
-void Receiver::stop() { impl->stop(); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
uint32_t Receiver::available() { return impl->available(); }
diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h
index e463559d99..5a057ba34e 100644
--- a/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -41,8 +41,6 @@ class ReceiverImpl : public virtual qpid::RefCounted
virtual Message get(qpid::sys::Duration timeout) = 0;
virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
virtual Message fetch(qpid::sys::Duration timeout) = 0;
- virtual void start() = 0;
- virtual void stop() = 0;
virtual void setCapacity(uint32_t) = 0;
virtual uint32_t getCapacity() = 0;
virtual uint32_t available() = 0;
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 6d7f45bc0a..ec0e174fc8 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -270,17 +270,14 @@ QPID_AUTO_TEST_CASE(testSimpleTopic)
sender.send(msg);
Receiver sub1 = fix.session.createReceiver(fix.topic);
sub1.setCapacity(10u);
- sub1.start();
msg.setContent("two");
sender.send(msg);
Receiver sub2 = fix.session.createReceiver(fix.topic);
sub2.setCapacity(10u);
- sub2.start();
msg.setContent("three");
sender.send(msg);
Receiver sub3 = fix.session.createReceiver(fix.topic);
sub3.setCapacity(10u);
- sub3.start();
msg.setContent("four");
sender.send(msg);
BOOST_CHECK_EQUAL(fetch(sub2, 2), boost::assign::list_of<std::string>("three")("four"));
@@ -304,7 +301,6 @@ QPID_AUTO_TEST_CASE(testNextReceiver)
for (uint i = 0; i < fix.queues.size(); i++) {
Receiver r = fix.session.createReceiver(fix.queues[i]);
r.setCapacity(10u);
- r.start();//TODO: add Session::start
}
for (uint i = 0; i < fix.queues.size(); i++) {
@@ -394,11 +390,9 @@ QPID_AUTO_TEST_CASE(testAvailable)
Receiver r1 = fix.session.createReceiver(fix.queues[0]);
r1.setCapacity(100);
- r1.start();
Receiver r2 = fix.session.createReceiver(fix.queues[1]);
r2.setCapacity(100);
- r2.start();
Sender s1 = fix.session.createSender(fix.queues[0]);
Sender s2 = fix.session.createSender(fix.queues[1]);