diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-06 16:45:27 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-06 16:45:27 +0000 |
commit | f5371ed7a94deb0c261d1ca8b5185d8c94cdf325 (patch) | |
tree | 142d6b52c36c4b50e0079d296f6ce5be5a58f364 /cpp | |
parent | 357557ff3f16b3dc0e6dacfab00e2d0a84f5c8fb (diff) | |
download | qpid-python-f5371ed7a94deb0c261d1ca8b5185d8c94cdf325.tar.gz |
* fix bug causing last message to occasionally be lost on replay
* make presence of gaps an error condition in the resuming_receiver example
* add ability to apply functor to replay buffer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/examples/failover/resuming_receiver.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/MessageReplayTracker.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/MessageReplayTracker.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/MessageReplayTracker.cpp | 20 |
4 files changed, 40 insertions, 5 deletions
diff --git a/cpp/examples/failover/resuming_receiver.cpp b/cpp/examples/failover/resuming_receiver.cpp index 6ac67d8edb..3c1df92ed1 100644 --- a/cpp/examples/failover/resuming_receiver.cpp +++ b/cpp/examples/failover/resuming_receiver.cpp @@ -41,14 +41,16 @@ class Listener : public MessageListener, public FailoverManager::Command Listener(); void received(Message& message); void execute(AsyncSession& session, bool isRetry); + void check(); private: Subscription subscription; uint count; uint skipped; uint lastSn; + bool gaps; }; -Listener::Listener() : count(0), skipped(0), lastSn(0) {} +Listener::Listener() : count(0), skipped(0), lastSn(0), gaps(false) {} void Listener::received(Message & message) { @@ -62,7 +64,8 @@ void Listener::received(Message & message) uint sn = message.getHeaders().getAsInt("sn"); if (lastSn < sn) { if (sn - lastSn > 1) { - std::cout << "Warning: gap in sequence between " << lastSn << " and " << sn << std::endl; + std::cout << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + gaps = true; } lastSn = sn; ++count; @@ -72,6 +75,11 @@ void Listener::received(Message & message) } } +void Listener::check() +{ + if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); +} + void Listener::execute(AsyncSession& session, bool isRetry) { if (isRetry) { @@ -94,6 +102,7 @@ int main(int argc, char ** argv) try { connection.execute(listener); connection.close(); + listener.check(); std::cout << "Completed without error." << std::endl; return 0; } catch(const std::exception& error) { diff --git a/cpp/src/qpid/client/MessageReplayTracker.cpp b/cpp/src/qpid/client/MessageReplayTracker.cpp index 3c36b03b34..9ffbb76837 100644 --- a/cpp/src/qpid/client/MessageReplayTracker.cpp +++ b/cpp/src/qpid/client/MessageReplayTracker.cpp @@ -28,8 +28,8 @@ MessageReplayTracker::MessageReplayTracker(uint f) : flushInterval(f), count(0) void MessageReplayTracker::send(const Message& message, const std::string& destination) { - ReplayRecord record(message, destination); - record.send(*this); + buffer.push_back(ReplayRecord(message, destination)); + buffer.back().send(*this); if (flushInterval && ++count >= flushInterval) { checkCompletion(); if (!buffer.empty()) session.flush(); @@ -70,7 +70,6 @@ MessageReplayTracker::ReplayRecord::ReplayRecord(const Message& m, const std::st void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker) { status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message); - tracker.buffer.push_back(*this); } bool MessageReplayTracker::ReplayRecord::isComplete() diff --git a/cpp/src/qpid/client/MessageReplayTracker.h b/cpp/src/qpid/client/MessageReplayTracker.h index 40324de4e9..45b16fb704 100644 --- a/cpp/src/qpid/client/MessageReplayTracker.h +++ b/cpp/src/qpid/client/MessageReplayTracker.h @@ -44,6 +44,13 @@ class MessageReplayTracker void setFlushInterval(uint interval); uint getFlushInterval(); void checkCompletion(); + + template <class F> void foreach(F& f) { + for (std::list<ReplayRecord>::const_iterator i = buffer.begin(); i != buffer.end(); i++) { + f(i->message); + } + } + private: struct ReplayRecord { diff --git a/cpp/src/tests/MessageReplayTracker.cpp b/cpp/src/tests/MessageReplayTracker.cpp index ba0174345f..a5121cdeb7 100644 --- a/cpp/src/tests/MessageReplayTracker.cpp +++ b/cpp/src/tests/MessageReplayTracker.cpp @@ -29,6 +29,23 @@ using namespace qpid::client; using namespace qpid::sys; using std::string; +class ReplayBufferChecker +{ + public: + + ReplayBufferChecker(uint from, uint to) : end(to), i(from) {} + + void operator()(const Message& m) + { + if (i > end) BOOST_FAIL("Extra message found: " + m.getData()); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i++)).str(), m.getData()); + } + private: + const uint end; + uint i; + +}; + QPID_AUTO_TEST_CASE(testReplay) { ProxySessionFixture fix; @@ -40,6 +57,9 @@ QPID_AUTO_TEST_CASE(testReplay) Message message((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); tracker.send(message); } + ReplayBufferChecker checker(1, 10); + tracker.foreach(checker); + tracker.replay(fix.session); for (uint j = 0; j < 2; j++) {//each message should have been sent twice for (uint i = 0; i < 5; i++) { |