diff options
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 22 |
8 files changed, 120 insertions, 15 deletions
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 4e3f950ef3..d77ddf3e43 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -88,13 +88,14 @@ class Session : public qpid::client::Handle<SessionImpl> QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Sender createSender(const Address& address); QPID_CLIENT_EXTERN Sender createSender(const std::string& address); QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address); QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address); - - QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string()); private: friend class qpid::client::PrivateImplRef<Session>; }; diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 8e060c62d7..e66dc5915c 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) return process(&handler, timeout); } +bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) +{ + //if there is not already a received message, we must wait for one + if (received.empty() && !wait(timeout)) return false; + //else we have a message in received; return the corresponding destination + destination = received.front()->as<MessageTransferBody>()->getDestination(); + return true; +} + void IncomingMessages::accept() { acceptTracker.accept(session); @@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination) } /** - * Get a frameset from session queue, waiting for up to the specified - * duration and returning true if this could be achieved, false - * otherwise. If a destination is supplied, only return a message for - * that destination. In this case messages from other destinations - * will be held on a received queue. + * Get a frameset that is accepted by the specified handler from + * session queue, waiting for up to the specified duration and + * returning true if this could be achieved, false otherwise. Messages + * that are not accepted by the handler are pushed onto received queue + * for later retrieval. */ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { @@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +bool IncomingMessages::wait(qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + received.push_back(content); + return true; + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + uint32_t IncomingMessages::pendingAccept() { return acceptTracker.acceptsPending(); diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index e84cd18892..2bc6dd49c4 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -70,8 +70,7 @@ class IncomingMessages void setSession(qpid::client::AsyncSession session); bool get(Handler& handler, qpid::sys::Duration timeout); - //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); - //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); void releaseAll(); void releasePending(const std::string& destination); @@ -90,6 +89,7 @@ class IncomingMessages AcceptTracker acceptTracker; bool process(Handler*, qpid::sys::Duration); + bool wait(qpid::sys::Duration); void retrieve(FrameSetPtr, qpid::messaging::Message*); }; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 101bc5ce0a..7f8e5f4e79 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -210,6 +210,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler } + +bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer) +{ + Receivers::const_iterator i = receivers.find(transfer.getDestination()); + if (i == receivers.end()) { + QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); + return false; + } else { + *receiver = i->second; + return true; + } +} + bool SessionImpl::accept(ReceiverImpl* receiver, qpid::messaging::Message* message, bool isDispatch, @@ -279,6 +292,37 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t } } +bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + std::string destination; + if (incoming.getNextDestination(destination, timeout)) { + Receivers::const_iterator i = receivers.find(destination); + if (i == receivers.end()) { + throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); + } else { + receiver = i->second; + } + return true; + } else { + return false; + } + } catch (TransportFailure&) { + reconnect(); + } + } +} + +qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout) +{ + qpid::messaging::Receiver receiver; + if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); + if (!receiver) throw qpid::Exception("Bad receiver returned!"); + return receiver; +} + uint32_t SessionImpl::available() { return get1<Available, uint32_t>((const std::string*) 0); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 9a7918d473..ec9a6162c1 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -73,6 +73,10 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Message fetch(qpid::sys::Duration timeout); bool dispatch(qpid::sys::Duration timeout); + bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout); + qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout); + + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); void receiverCancelled(const std::string& name); @@ -115,6 +119,7 @@ class SessionImpl : public qpid::messaging::SessionImpl bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); + bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); void commitImpl(); diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index b69b575b26..aa8e067168 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -65,11 +65,6 @@ Receiver Session::createReceiver(const std::string& address) return impl->createReceiver(Address(address)); } -Address Session::createTempQueue(const std::string& baseName) -{ - return impl->createTempQueue(baseName); -} - void Session::sync() { impl->sync(); @@ -94,6 +89,18 @@ bool Session::dispatch(qpid::sys::Duration timeout) { return impl->dispatch(timeout); } + +bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) +{ + return impl->nextReceiver(receiver, timeout); +} + + +Receiver Session::nextReceiver(qpid::sys::Duration timeout) +{ + return impl->nextReceiver(timeout); +} + uint32_t Session::available() { return impl->available(); } uint32_t Session::pendingAck() { return impl->pendingAck(); } diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index e48e7a4d02..cf95e22ae8 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -51,9 +51,10 @@ class SessionImpl : public virtual qpid::RefCounted virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; virtual Message fetch(qpid::sys::Duration timeout) = 0; virtual bool dispatch(qpid::sys::Duration timeout) = 0; - virtual Address createTempQueue(const std::string& baseName) = 0; virtual Sender createSender(const Address& address) = 0; virtual Receiver createReceiver(const Address& address) = 0; + virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0; + virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0; virtual uint32_t available() = 0; virtual uint32_t pendingAck() = 0; private: diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index ce29099ef9..dd7166df46 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -354,6 +354,28 @@ QPID_AUTO_TEST_CASE(testSessionDispatch) BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3")); } +QPID_AUTO_TEST_CASE(testNextReceiver) +{ + MultiQueueFixture fix; + + for (uint i = 0; i < fix.queues.size(); i++) { + Receiver r = fix.session.createReceiver(fix.queues[i]); + r.setCapacity(10u); + r.start();//TODO: add Session::start + } + + for (uint i = 0; i < fix.queues.size(); i++) { + Sender s = fix.session.createSender(fix.queues[i]); + Message msg((boost::format("Message_%1%") % (i+1)).str()); + s.send(msg); + } + + for (uint i = 0; i < fix.queues.size(); i++) { + Message msg; + BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + } +} QPID_AUTO_TEST_CASE(testMapMessage) { |