summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-16 11:58:45 +0000
committerGordon Sim <gsim@apache.org>2009-11-16 11:58:45 +0000
commitefc6473096622a01b2a3907093431b49d8ebfb1e (patch)
tree27c712cf1eeff318c0663e77cdea3db4a5e54095 /cpp/src
parent454379917ad7b797a045cbefc56bf598e3fd534b (diff)
downloadqpid-python-efc6473096622a01b2a3907093431b49d8ebfb1e.tar.gz
Merge branch 'next_receiver_changes' into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880718 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp5
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp72
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h12
-rw-r--r--cpp/src/qpid/messaging/Receiver.cpp1
-rw-r--r--cpp/src/qpid/messaging/ReceiverImpl.h1
-rw-r--r--cpp/src/qpid/messaging/Session.cpp15
-rw-r--r--cpp/src/qpid/messaging/SessionImpl.h3
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp57
10 files changed, 3 insertions, 166 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4b859cda47..a15d722f5d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -799,7 +799,6 @@ nobase_include_HEADERS += \
../include/qpid/messaging/MapContent.h \
../include/qpid/messaging/MapView.h \
../include/qpid/messaging/Message.h \
- ../include/qpid/messaging/MessageListener.h \
../include/qpid/messaging/Sender.h \
../include/qpid/messaging/Receiver.h \
../include/qpid/messaging/Session.h \
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index f294d7e273..83b245aa02 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -22,7 +22,6 @@
#include "AddressResolution.h"
#include "MessageSource.h"
#include "SessionImpl.h"
-#include "qpid/messaging/MessageListener.h"
#include "qpid/messaging/Receiver.h"
namespace qpid {
@@ -115,8 +114,6 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
}
}
-void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
-qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
const std::string& ReceiverImpl::getName() const { return destination; }
@@ -139,7 +136,7 @@ ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a) :
parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
- state(UNRESOLVED), capacity(0), listener(0), window(0) {}
+ state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index d05fd3d045..3a18368116 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -62,8 +62,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
uint32_t getCapacity();
uint32_t available();
uint32_t pendingAck();
- void setListener(qpid::messaging::MessageListener* listener);
- qpid::messaging::MessageListener* getListener();
void received(qpid::messaging::Message& message);
private:
SessionImpl& parent;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 7f8e5f4e79..d0085dad75 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -30,7 +30,6 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
-#include "qpid/messaging/MessageListener.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
@@ -177,13 +176,6 @@ Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
return sender;
}
-qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName)
-{
- std::string name = baseName + std::string("_") + session.getId().getName();
- session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true);
- return qpid::messaging::Address(name);
-}
-
SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
{
boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
@@ -225,16 +217,10 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT
bool SessionImpl::accept(ReceiverImpl* receiver,
qpid::messaging::Message* message,
- bool isDispatch,
IncomingMessages::MessageTransfer& transfer)
{
if (receiver->getName() == transfer.getDestination()) {
transfer.retrieve(message);
- if (isDispatch) {
- qpid::sys::Mutex::ScopedUnlock u(lock);
- qpid::messaging::MessageListener* listener = receiver->getListener();
- if (listener) listener->received(*message);
- }
receiver->received(*message);
return true;
} else {
@@ -242,18 +228,6 @@ bool SessionImpl::accept(ReceiverImpl* receiver,
}
}
-bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer)
-{
- Receivers::iterator i = receivers.find(transfer.getDestination());
- if (i == receivers.end()) {
- QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
- return false;
- } else {
- boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second);
- return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer);
- }
-}
-
bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
{
return incoming.get(handler, timeout);
@@ -261,37 +235,10 @@ bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Dur
bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
- IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1));
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
return getIncoming(handler, timeout);
}
-bool SessionImpl::dispatch(qpid::sys::Duration timeout)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- while (true) {
- try {
- qpid::messaging::Message message;
- IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1));
- return getIncoming(handler, timeout);
- } catch (TransportFailure&) {
- reconnect();
- }
- }
-}
-
-bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- while (true) {
- try {
- IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1));
- return getIncoming(handler, timeout);
- } catch (TransportFailure&) {
- reconnect();
- }
- }
-}
-
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -418,13 +365,6 @@ void SessionImpl::rejectImpl(qpid::messaging::Message& m)
session.messageReject(set);
}
-qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout)
-{
- qpid::messaging::Message result;
- if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
- return result;
-}
-
void SessionImpl::receiverCancelled(const std::string& name)
{
receivers.erase(name);
@@ -442,14 +382,4 @@ void SessionImpl::reconnect()
connection.reconnect();
}
-void* SessionImpl::getLastConfirmedSent()
-{
- return 0;
-}
-
-void* SessionImpl::getLastConfirmedAcknowledged()
-{
- return 0;
-}
-
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index ec9a6162c1..f3018b9685 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -62,21 +62,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
void close();
void sync();
void flush();
- qpid::messaging::Address createTempQueue(const std::string& baseName);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address);
- void* getLastConfirmedSent();
- void* getLastConfirmedAcknowledged();
-
- bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- qpid::messaging::Message fetch(qpid::sys::Duration timeout);
- bool dispatch(qpid::sys::Duration timeout);
-
bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout);
qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout);
-
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);
void receiverCancelled(const std::string& name);
@@ -116,8 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
Receivers receivers;
Senders senders;
- bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
- bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
+ bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp
index 3290ea98ac..76750cfc59 100644
--- a/cpp/src/qpid/messaging/Receiver.cpp
+++ b/cpp/src/qpid/messaging/Receiver.cpp
@@ -49,6 +49,5 @@ uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
uint32_t Receiver::available() { return impl->available(); }
uint32_t Receiver::pendingAck() { return impl->pendingAck(); }
void Receiver::cancel() { impl->cancel(); }
-void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); }
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h
index 7db20acc29..e463559d99 100644
--- a/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -48,7 +48,6 @@ class ReceiverImpl : public virtual qpid::RefCounted
virtual uint32_t available() = 0;
virtual uint32_t pendingAck() = 0;
virtual void cancel() = 0;
- virtual void setListener(MessageListener*) = 0;
};
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp
index aa8e067168..53e85d53b1 100644
--- a/cpp/src/qpid/messaging/Session.cpp
+++ b/cpp/src/qpid/messaging/Session.cpp
@@ -75,21 +75,6 @@ void Session::flush()
impl->flush();
}
-bool Session::fetch(Message& message, qpid::sys::Duration timeout)
-{
- return impl->fetch(message, timeout);
-}
-
-Message Session::fetch(qpid::sys::Duration timeout)
-{
- return impl->fetch(timeout);
-}
-
-bool Session::dispatch(qpid::sys::Duration timeout)
-{
- return impl->dispatch(timeout);
-}
-
bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout)
{
return impl->nextReceiver(receiver, timeout);
diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h
index cf95e22ae8..b68baf821c 100644
--- a/cpp/src/qpid/messaging/SessionImpl.h
+++ b/cpp/src/qpid/messaging/SessionImpl.h
@@ -48,9 +48,6 @@ class SessionImpl : public virtual qpid::RefCounted
virtual void close() = 0;
virtual void sync() = 0;
virtual void flush() = 0;
- virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
- virtual Message fetch(qpid::sys::Duration timeout) = 0;
- virtual bool dispatch(qpid::sys::Duration timeout) = 0;
virtual Sender createSender(const Address& address) = 0;
virtual Receiver createReceiver(const Address& address) = 0;
virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0;
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index dd7166df46..00a8481c93 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -28,7 +28,6 @@
#include "qpid/messaging/MapContent.h"
#include "qpid/messaging/MapView.h"
#include "qpid/messaging/Message.h"
-#include "qpid/messaging/MessageListener.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Session.h"
@@ -187,16 +186,6 @@ struct MultiQueueFixture : MessagingFixture
}
};
-
-struct MessageDataCollector : MessageListener
-{
- std::vector<std::string> messageData;
-
- void received(Message& message) {
- messageData.push_back(message.getContent());
- }
-};
-
std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
{
std::vector<std::string> data;
@@ -308,52 +297,6 @@ QPID_AUTO_TEST_CASE(testSimpleTopic)
//TODO: check pending messages...
}
-QPID_AUTO_TEST_CASE(testSessionFetch)
-{
- MultiQueueFixture fix;
-
- for (uint i = 0; i < fix.queues.size(); i++) {
- Receiver r = fix.session.createReceiver(fix.queues[i]);
- r.setCapacity(10u);
- r.start();//TODO: add Session::start
- }
-
- for (uint i = 0; i < fix.queues.size(); i++) {
- Sender s = fix.session.createSender(fix.queues[i]);
- Message msg((boost::format("Message_%1%") % (i+1)).str());
- s.send(msg);
- }
-
- for (uint i = 0; i < fix.queues.size(); i++) {
- Message msg;
- BOOST_CHECK(fix.session.fetch(msg, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
- }
-}
-
-QPID_AUTO_TEST_CASE(testSessionDispatch)
-{
- MultiQueueFixture fix;
-
- MessageDataCollector collector;
- for (uint i = 0; i < fix.queues.size(); i++) {
- Receiver r = fix.session.createReceiver(fix.queues[i]);
- r.setListener(&collector);
- r.setCapacity(10u);
- r.start();//TODO: add Session::start
- }
-
- for (uint i = 0; i < fix.queues.size(); i++) {
- Sender s = fix.session.createSender(fix.queues[i]);
- Message msg((boost::format("Message_%1%") % (i+1)).str());
- s.send(msg);
- }
-
- while (fix.session.dispatch(qpid::sys::TIME_SEC)) ;
-
- BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3"));
-}
-
QPID_AUTO_TEST_CASE(testNextReceiver)
{
MultiQueueFixture fix;