summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp53
1 files changed, 48 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index b0a16674e1..d22208368b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -81,12 +81,31 @@ struct MatchAndTrack
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
}
bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@ void IncomingMessages::releaseAll()
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}