diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-04 16:06:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-04 16:06:05 +0000 |
commit | 03f1df9ff7894a6d910120c82bba49e6193178ee (patch) | |
tree | e27792701ea80e1aa83d1c5730da71502374d25d /cpp/src | |
parent | c848c1a4e6be50176f10170c6422c5e4d5385770 (diff) | |
download | qpid-python-03f1df9ff7894a6d910120c82bba49e6193178ee.tar.gz |
Additional tests and fixes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581957 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumberSet.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumberSet.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/AccumulatedAckTest.cpp | 37 |
8 files changed, 66 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b3880d86e5..e6c7b28a49 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -198,7 +198,8 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* //TODO: implement mode SequenceNumberSet results; - transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results)); + RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, results); + transfers.processRanges(op); results = results.condense(); getProxy().getMessage().acquired(results); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 9cacb4ccf7..dc5407be99 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -31,12 +31,16 @@ #include "qpid/framing/InvocationVisitor.h" #include <boost/format.hpp> +#include <boost/bind.hpp> using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {} +SemanticHandler::SemanticHandler(SessionState& s) : + state(*this,s), session(s), + ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) + {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -81,13 +85,7 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran //ack messages: state.ackCumulative(mark.getValue()); } - if (range.size() % 2) { //must be even number - throw ConnectionException(530, "Received odd number of elements in ranged mark"); - } else { - for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - state.ackRange(*i, *(++i)); - } - } + range.processRanges(ackOp); } void SemanticHandler::sendCompletion() diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index dc7f21ac34..9380708ec5 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -33,6 +33,8 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceNumber.h" +#include <boost/function.hpp> + namespace qpid { namespace framing { @@ -51,6 +53,8 @@ class SemanticHandler : public DeliveryAdapter, public framing::AMQP_ServerOperations::ExecutionHandler { + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + SemanticState state; SessionState& session; // FIXME aconway 2007-09-20: Why are these on the handler rather than the @@ -59,6 +63,7 @@ class SemanticHandler : public DeliveryAdapter, framing::Window outgoing; sys::Mutex outLock; MessageBuilder msgBuilder; + RangedOperation ackOp; enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK}; TrackId getTrack(const framing::AMQFrame& frame); diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp index 219a68b96c..bf53bf0cd6 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.cpp +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -22,12 +22,15 @@ #include <assert.h> #include <iostream> +#include <boost/bind.hpp> using std::list; using std::max; using std::min; using namespace qpid::framing; +AccumulatedAck::AccumulatedAck(SequenceNumber r) : mark(r) {} + void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ assert(first <= last); if (last < mark) return; @@ -103,9 +106,7 @@ void AccumulatedAck::collectRanges(SequenceNumberSet& set) const void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range) { update(mark, cumulative); - for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - update(*i, *(++i)); - } + range.processRanges(*this); } diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h index 1f66197e2a..a635d2ea04 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.h +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -58,13 +58,14 @@ namespace qpid { */ std::list<Range> ranges; - explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {} + explicit AccumulatedAck(SequenceNumber r = SequenceNumber()); void update(SequenceNumber firstTag, SequenceNumber lastTag); void consolidate(); void clear(); bool covers(SequenceNumber tag) const; void collectRanges(SequenceNumberSet& set) const; void update(const SequenceNumber cumulative, const SequenceNumberSet& range); + void operator()(SequenceNumber first, SequenceNumber last) { update(first, last); } }; std::ostream& operator<<(std::ostream&, const Range&); std::ostream& operator<<(std::ostream&, const AccumulatedAck&); diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp index f1c81e078b..b769befeb7 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.cpp +++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -60,6 +60,12 @@ SequenceNumberSet SequenceNumberSet::condense() const return result; } +void SequenceNumberSet::addRange(const SequenceNumber& start, const SequenceNumber& end) +{ + push_back(start); + push_back(end); +} + namespace qpid{ namespace framing{ diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h index f9d0cc1fd4..9091e7142e 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.h +++ b/cpp/src/qpid/framing/SequenceNumberSet.h @@ -41,17 +41,18 @@ public: void decode(Buffer& buffer); uint32_t encodedSize() const; SequenceNumberSet condense() const; + void addRange(const SequenceNumber& start, const SequenceNumber& end); template <class T> - void processRanges(T t) const + void processRanges(T& t) const { if (size() % 2) { //must be even number throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); } for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { - SequenceNumber first = i->getValue(); - SequenceNumber last = (++i)->getValue(); + SequenceNumber first = *(i); + SequenceNumber last = *(++i); t(first, last); } } diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp index 62245e463b..cbe44e6814 100644 --- a/cpp/src/tests/AccumulatedAckTest.cpp +++ b/cpp/src/tests/AccumulatedAckTest.cpp @@ -32,6 +32,7 @@ class AccumulatedAckTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(AccumulatedAckTest); CPPUNIT_TEST(testGeneral); CPPUNIT_TEST(testCovers); + CPPUNIT_TEST(testUpdateFromCompletionData); CPPUNIT_TEST(testCase1); CPPUNIT_TEST(testCase2); CPPUNIT_TEST(testCase3); @@ -39,6 +40,7 @@ class AccumulatedAckTest : public CppUnit::TestCase CPPUNIT_TEST(testConsolidation1); CPPUNIT_TEST(testConsolidation2); CPPUNIT_TEST(testConsolidation3); + CPPUNIT_TEST(testConsolidation4); CPPUNIT_TEST_SUITE_END(); public: @@ -97,6 +99,25 @@ public: CPPUNIT_ASSERT(!covers(ack, 10)); } + void testUpdateFromCompletionData() + { + AccumulatedAck ack(0); + SequenceNumber mark(2); + SequenceNumberSet ranges; + ranges.addRange(SequenceNumber(5), SequenceNumber(8)); + ranges.addRange(SequenceNumber(10), SequenceNumber(15)); + ranges.addRange(SequenceNumber(9), SequenceNumber(9)); + ranges.addRange(SequenceNumber(3), SequenceNumber(4)); + + ack.update(mark, ranges); + + for(int i = 0; i <= 15; i++) { + CPPUNIT_ASSERT(covers(ack, i)); + } + CPPUNIT_ASSERT(!covers(ack, 16)); + CPPUNIT_ASSERT_EQUAL((uint32_t) 15, ack.mark.getValue()); + } + void testCase1() { AccumulatedAck ack(3); @@ -205,6 +226,22 @@ public: CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size()); } + void testConsolidation4() + { + AccumulatedAck ack(0); + ack.update(SequenceNumber(0), SequenceNumber(2)); + ack.update(SequenceNumber(5), SequenceNumber(8)); + ack.update(SequenceNumber(10), SequenceNumber(15)); + ack.update(SequenceNumber(9), SequenceNumber(9)); + ack.update(SequenceNumber(3), SequenceNumber(4)); + + for(int i = 0; i <= 15; i++) { + CPPUNIT_ASSERT(covers(ack, i)); + } + CPPUNIT_ASSERT(!covers(ack, 16)); + CPPUNIT_ASSERT_EQUAL((uint32_t) 15, ack.mark.getValue()); + } + }; // Make this test suite a plugin. |