summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/qpid/messaging/Session.h5
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp35
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp44
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h5
-rw-r--r--cpp/src/qpid/messaging/Session.cpp17
-rw-r--r--cpp/src/qpid/messaging/SessionImpl.h3
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp22
8 files changed, 120 insertions, 15 deletions
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h
index 4e3f950ef3..d77ddf3e43 100644
--- a/cpp/include/qpid/messaging/Session.h
+++ b/cpp/include/qpid/messaging/Session.h
@@ -88,13 +88,14 @@ class Session : public qpid::client::Handle<SessionImpl>
QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+
QPID_CLIENT_EXTERN Sender createSender(const Address& address);
QPID_CLIENT_EXTERN Sender createSender(const std::string& address);
QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address);
QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address);
-
- QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
private:
friend class qpid::client::PrivateImplRef<Session>;
};
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 8e060c62d7..e66dc5915c 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
return process(&handler, timeout);
}
+bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+{
+ //if there is not already a received message, we must wait for one
+ if (received.empty() && !wait(timeout)) return false;
+ //else we have a message in received; return the corresponding destination
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+}
+
void IncomingMessages::accept()
{
acceptTracker.accept(session);
@@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination)
}
/**
- * Get a frameset from session queue, waiting for up to the specified
- * duration and returning true if this could be achieved, false
- * otherwise. If a destination is supplied, only return a message for
- * that destination. In this case messages from other destinations
- * will be held on a received queue.
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
*/
bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
@@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ return true;
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
uint32_t IncomingMessages::pendingAccept()
{
return acceptTracker.acceptsPending();
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index e84cd18892..2bc6dd49c4 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -70,8 +70,7 @@ class IncomingMessages
void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
- //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
void releaseAll();
void releasePending(const std::string& destination);
@@ -90,6 +89,7 @@ class IncomingMessages
AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
+ bool wait(qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
};
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 101bc5ce0a..7f8e5f4e79 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -210,6 +210,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler
}
+
+bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
+{
+ Receivers::const_iterator i = receivers.find(transfer.getDestination());
+ if (i == receivers.end()) {
+ QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+ return false;
+ } else {
+ *receiver = i->second;
+ return true;
+ }
+}
+
bool SessionImpl::accept(ReceiverImpl* receiver,
qpid::messaging::Message* message,
bool isDispatch,
@@ -279,6 +292,37 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t
}
}
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (true) {
+ try {
+ std::string destination;
+ if (incoming.getNextDestination(destination, timeout)) {
+ Receivers::const_iterator i = receivers.find(destination);
+ if (i == receivers.end()) {
+ throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
+ } else {
+ receiver = i->second;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (TransportFailure&) {
+ reconnect();
+ }
+ }
+}
+
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Receiver receiver;
+ if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
+ if (!receiver) throw qpid::Exception("Bad receiver returned!");
+ return receiver;
+}
+
uint32_t SessionImpl::available()
{
return get1<Available, uint32_t>((const std::string*) 0);
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 9a7918d473..ec9a6162c1 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -73,6 +73,10 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Message fetch(qpid::sys::Duration timeout);
bool dispatch(qpid::sys::Duration timeout);
+ bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout);
+ qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout);
+
+
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);
void receiverCancelled(const std::string& name);
@@ -115,6 +119,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+ bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
void commitImpl();
diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp
index b69b575b26..aa8e067168 100644
--- a/cpp/src/qpid/messaging/Session.cpp
+++ b/cpp/src/qpid/messaging/Session.cpp
@@ -65,11 +65,6 @@ Receiver Session::createReceiver(const std::string& address)
return impl->createReceiver(Address(address));
}
-Address Session::createTempQueue(const std::string& baseName)
-{
- return impl->createTempQueue(baseName);
-}
-
void Session::sync()
{
impl->sync();
@@ -94,6 +89,18 @@ bool Session::dispatch(qpid::sys::Duration timeout)
{
return impl->dispatch(timeout);
}
+
+bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout)
+{
+ return impl->nextReceiver(receiver, timeout);
+}
+
+
+Receiver Session::nextReceiver(qpid::sys::Duration timeout)
+{
+ return impl->nextReceiver(timeout);
+}
+
uint32_t Session::available() { return impl->available(); }
uint32_t Session::pendingAck() { return impl->pendingAck(); }
diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h
index e48e7a4d02..cf95e22ae8 100644
--- a/cpp/src/qpid/messaging/SessionImpl.h
+++ b/cpp/src/qpid/messaging/SessionImpl.h
@@ -51,9 +51,10 @@ class SessionImpl : public virtual qpid::RefCounted
virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
virtual Message fetch(qpid::sys::Duration timeout) = 0;
virtual bool dispatch(qpid::sys::Duration timeout) = 0;
- virtual Address createTempQueue(const std::string& baseName) = 0;
virtual Sender createSender(const Address& address) = 0;
virtual Receiver createReceiver(const Address& address) = 0;
+ virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0;
+ virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0;
virtual uint32_t available() = 0;
virtual uint32_t pendingAck() = 0;
private:
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index ce29099ef9..dd7166df46 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -354,6 +354,28 @@ QPID_AUTO_TEST_CASE(testSessionDispatch)
BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3"));
}
+QPID_AUTO_TEST_CASE(testNextReceiver)
+{
+ MultiQueueFixture fix;
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Receiver r = fix.session.createReceiver(fix.queues[i]);
+ r.setCapacity(10u);
+ r.start();//TODO: add Session::start
+ }
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Sender s = fix.session.createSender(fix.queues[i]);
+ Message msg((boost::format("Message_%1%") % (i+1)).str());
+ s.send(msg);
+ }
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Message msg;
+ BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+}
QPID_AUTO_TEST_CASE(testMapMessage)
{