diff options
Diffstat (limited to 'cpp/src/qpid/client/CompletionTracker.cpp')
-rw-r--r-- | cpp/src/qpid/client/CompletionTracker.cpp | 74 |
1 files changed, 65 insertions, 9 deletions
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 996971dbd2..46a7384ac2 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -20,45 +20,101 @@ */ #include "CompletionTracker.h" +#include <algorithm> using qpid::client::CompletionTracker; using namespace qpid::framing; using namespace boost; +namespace +{ +const std::string empty; +} + CompletionTracker::CompletionTracker() {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} +void CompletionTracker::close() +{ + sys::Mutex::ScopedLock l(lock); + while (!listeners.empty()) { + Record r(listeners.front()); + { + sys::Mutex::ScopedUnlock u(lock); + r.completed(); + } + listeners.pop_front(); + } +} void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; - while (!listeners.empty() && !(listeners.front().first > mark)) { - Listener f(listeners.front().second); + while (!listeners.empty() && !(listeners.front().id > mark)) { + Record r(listeners.front()); { sys::Mutex::ScopedUnlock u(lock); - f(); + r.completed(); } - listeners.pop(); + listeners.pop_front(); + } +} + +void CompletionTracker::received(const SequenceNumber& id, const std::string& result) +{ + sys::Mutex::ScopedLock l(lock); + Listeners::iterator i = seek(id); + if (i != listeners.end() && i->id == id) { + i->received(result); + listeners.erase(i); } } -void CompletionTracker::listen(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) { - if (!add(point, listener)) { + if (!add(Record(point, listener))) { listener(); } } -bool CompletionTracker::add(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) +{ + if (!add(Record(point, listener))) { + listener(empty); + } +} + +bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (point < mark) { + if (record.id < mark) { return false; } else { - listeners.push(make_pair(point, listener)); + //insert at the correct position + Listeners::iterator i = seek(record.id); + if (i == listeners.end()) i = listeners.begin(); + listeners.insert(i, record); + return true; } } +CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) +{ + Listeners::iterator i = listeners.begin(); + while (i != listeners.end() && i->id < point) i++; + return i; +} + +void CompletionTracker::Record::completed() +{ + if (f) f(); + else if(g) g(empty);//won't get a result if command is now complete +} + +void CompletionTracker::Record::received(const std::string& result) +{ + if (g) g(result); +} |