diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 53 |
1 files changed, 48 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index b0a16674e1..d22208368b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -81,12 +81,31 @@ struct MatchAndTrack } } }; + +struct Match +{ + const std::string destination; + uint32_t matched; + + Match(const std::string& d) : destination(d), matched(0) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ++matched; + return true; + } else { + return false; + } + } +}; } void IncomingMessages::setSession(qpid::client::AsyncSession s) { session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); + acceptTracker.reset(); } bool IncomingMessages::get(Handler& handler, Duration timeout) @@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) void IncomingMessages::accept() { - session.messageAccept(unaccepted); - unaccepted.clear(); + acceptTracker.accept(session); } void IncomingMessages::releaseAll() @@ -121,8 +139,7 @@ void IncomingMessages::releaseAll() GetAny handler; while (process(&handler, 0)) ; //now release all messages - session.messageRelease(unaccepted); - unaccepted.clear(); + acceptTracker.release(session); } void IncomingMessages::releasePending(const std::string& destination) @@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +uint32_t IncomingMessages::pendingAccept() +{ + return acceptTracker.acceptsPending(); +} +uint32_t IncomingMessages::pendingAccept(const std::string& destination) +{ + return acceptTracker.acceptsPending(destination); +} + +uint32_t IncomingMessages::available() +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + //return the count of received messages + return received.size(); +} + +uint32_t IncomingMessages::available(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + + //count all messages for this destination from received list + return std::for_each(received.begin(), received.end(), Match(destination)).matched; +} + void populate(qpid::messaging::Message& message, FrameSet& command); /** @@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { - unaccepted.add(command->getId()); + acceptTracker.delivered(transfer->getDestination(), command->getId()); } session.markCompleted(command->getId(), false, false); } |