diff options
author | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
commit | 9acf66d9c848809e3308e50998d38a0183b038a4 (patch) | |
tree | b8ca2a996000b38a71cbb098f171af9ae4c540cc /qpid/cpp/src/tests/SessionState.cpp | |
parent | fdb31574f9cdb3474b4984fb0776f02ea4e32433 (diff) | |
download | qpid-python-9acf66d9c848809e3308e50998d38a0183b038a4.tar.gz |
Session state as per AMQP 0-10 specification.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651997 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/tests/SessionState.cpp | 254 |
1 files changed, 229 insertions, 25 deletions
diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp index 318bfbbddd..752d6d3e75 100644 --- a/qpid/cpp/src/tests/SessionState.cpp +++ b/qpid/cpp/src/tests/SessionState.cpp @@ -16,17 +16,33 @@ * */ -#include "qpid/framing/SessionState.h" +#include "unit_test.h" + +#include "qpid/framing/SessionState.h" // FIXME aconway 2008-04-23: preview code to remove. +#include "qpid/SessionState.h" #include "qpid/Exception.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SessionFlushBody.h" #include <boost/bind.hpp> -#include "unit_test.h" +#include <algorithm> +#include <functional> +#include <numeric> QPID_AUTO_TEST_SUITE(SessionStateTestSuite) using namespace std; -using namespace qpid::framing; using namespace boost; +using namespace qpid::framing; + +// ================================================================ +// Utility functions. + +// Apply f to [begin, end) and accumulate the result +template <class Iter, class T, class F> +T applyAccumulate(Iter begin, Iter end, T seed, const F& f) { + return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2))); +} // Create a frame with a one-char string. AMQFrame& frame(char s) { @@ -35,13 +51,215 @@ AMQFrame& frame(char s) { return frame; } -// Extract the one-char string from a frame. -char charFromFrame(const AMQFrame& f) { - const AMQContentBody* b=dynamic_cast<const AMQContentBody*>(f.getBody()); - BOOST_REQUIRE(b && b->getData().size() > 0); - return b->getData()[0]; +// Simple string representation of a frame. +string str(const AMQFrame& f) { + if (f.getMethod()) return "C"; // Command or Control + const AMQContentBody* c = dynamic_cast<const AMQContentBody*>(f.getBody()); + if (c) return c->getData(); // Return data for content frames. + return "H"; // Must be a header. +} +// Make a string from a range of frames. +string str(const vector<AMQFrame>& frames) { + string (*strFrame)(const AMQFrame&) = str; + return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame)); +} +// Make a transfer command frame. +AMQFrame transferFrame(bool hasContent) { + AMQFrame t(in_place<MessageTransferBody>()); + t.setFirstFrame(); + t.setLastFrame(); + t.setFirstSegment(); + t.setLastSegment(!hasContent); + return t; +} +// Make a content frame +AMQFrame contentFrame(string content, bool isLast=true) { + AMQFrame f(in_place<AMQContentBody>(content)); + f.setFirstFrame(); + f.setLastFrame(); + f.setLastSegment(isLast); + return f; +} +AMQFrame contentFrameChar(char content, bool isLast=true) { + return contentFrame(string(1, content), isLast); +} + +// Send frame & return size of frame. +size_t send(qpid::SessionState& s, const AMQFrame& f) { s.send(f); return f.size(); } +// Send transfer command with no content. +size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); } +// Send transfer frame with single content frame. +size_t transfer1(qpid::SessionState& s, string content) { + return send(s,transferFrame(true)) + send(s,contentFrame(content)); +} +size_t transfer1Char(qpid::SessionState& s, char content) { + return transfer1(s, string(1,content)); +} + +// Send transfer frame with multiple single-byte content frames. +size_t transferN(qpid::SessionState& s, string content) { + size_t size=send(s, transferFrame(!content.empty())); + if (!content.empty()) { + char last = content[content.size()-1]; + content.resize(content.size()-1); + size += applyAccumulate(content.begin(), content.end(), 0, + bind(&send, ref(s), + bind(contentFrameChar, _1, false))); + size += send(s, contentFrameChar(last, true)); + } + return size; +} + +// Send multiple transfers with single-byte content. +size_t transfers(qpid::SessionState& s, string content) { + return applyAccumulate(content.begin(), content.end(), 0, + bind(transfer1Char, ref(s), _1)); +} + +size_t contentFrameSize(size_t n=1) { return AMQFrame(in_place<AMQContentBody>()).size() + n; } +size_t transferFrameSize() { return AMQFrame(in_place<MessageTransferBody>()).size(); } + +// ==== qpid::SessionState test classes + +using qpid::SessionId; +using qpid::SessionPoint; + + +QPID_AUTO_TEST_CASE(testSendGetReplyList) { + qpid::SessionState s; + transfer1(s, "abc"); + transfers(s, "def"); + transferN(s, "xyz"); + BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); + // Ignore controls. + s.send(AMQFrame(in_place<SessionFlushBody>())); + BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); +} + +QPID_AUTO_TEST_CASE(testNeedFlush) { + qpid::SessionState::Configuration c; + // sync after 2 1-byte transfers or equivalent bytes. + c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + qpid::SessionState s(SessionId(), c); + transfers(s, "a"); + BOOST_CHECK(!s.needFlush()); + transfers(s, "b"); + BOOST_CHECK(s.needFlush()); + s.sendFlush(); + BOOST_CHECK(!s.needFlush()); + transfers(s, "c"); + BOOST_CHECK(!s.needFlush()); + transfers(s, "d"); + BOOST_CHECK(s.needFlush()); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); } +QPID_AUTO_TEST_CASE(testPeerConfirmed) { + qpid::SessionState::Configuration c; + // sync after 2 1-byte transfers or equivalent bytes. + c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + qpid::SessionState s(SessionId(), c); + transfers(s, "ab"); + BOOST_CHECK(s.needFlush()); + transfers(s, "cd"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); + s.peerConfirmed(SessionPoint(3)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cd"); + BOOST_CHECK(!s.needFlush()); + + // Never go backwards. + s.peerConfirmed(SessionPoint(2)); + s.peerConfirmed(SessionPoint(3)); + + // Multi-frame transfer. + transfer1(s, "efg"); + transfers(s, "xy"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CdCefgCxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(4)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CefgCxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(5)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(6)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cy"); + BOOST_CHECK(!s.needFlush()); +} + +QPID_AUTO_TEST_CASE(testPeerCompleted) { + qpid::SessionState s; + // Completion implies confirmation + transfers(s, "abc"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCc"); + SequenceSet set(SequenceSet() + 0 + 1); + s.peerCompleted(set); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cc"); + + transfers(s, "def"); + // We dont do out-of-order confirmation, so this will only confirm up to 3: + set = SequenceSet(SequenceSet() + 2 + 3 + 5); + s.peerCompleted(set); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CeCf"); +} + +QPID_AUTO_TEST_CASE(testReceive) { + // Advance expecting/received correctly + qpid::SessionState s; + BOOST_CHECK(!s.hasState()); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(0)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(0)); + + BOOST_CHECK(s.receive(transferFrame(false))); + BOOST_CHECK(s.hasState()); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(1)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(1)); + + BOOST_CHECK(s.receive(transferFrame(true))); + SessionPoint point = SessionPoint(1, transferFrameSize()); + BOOST_CHECK_EQUAL(s.getExpecting(), point); + BOOST_CHECK_EQUAL(s.getReceived(), point); + BOOST_CHECK(s.receive(contentFrame("", false))); + point.offset += contentFrameSize(0); + BOOST_CHECK_EQUAL(s.getExpecting(), point); + BOOST_CHECK_EQUAL(s.getReceived(), point); + BOOST_CHECK(s.receive(contentFrame("", true))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); + + // Idempotence barrier, rewind expecting & receive some duplicates. + s.setExpecting(SessionPoint(1)); + BOOST_CHECK(!s.receive(transferFrame(false))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); + BOOST_CHECK(s.receive(transferFrame(false))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(3)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(3)); +} + +QPID_AUTO_TEST_CASE(testCompleted) { + // completed & unknownCompleted + qpid::SessionState s; + s.receive(transferFrame(false)); + s.receive(transferFrame(false)); + s.receive(transferFrame(false)); + s.localCompleted(1); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+1)); + s.localCompleted(0); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), + SequenceSet(SequenceSet() + SequenceSet::Range(0,2))); + s.peerKnownComplete(SequenceSet(SequenceSet()+1)); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+2)); +} + +// ================================================================ +// FIXME aconway 2008-04-23: Below here is old preview framing::SessionState test, remove with preview code. + +using namespace qpid::framing; + // Sent chars as frames void sent(SessionState& session, const std::string& frames) { for_each(frames.begin(), frames.end(), @@ -54,26 +272,12 @@ void received(SessionState& session, const std::string& frames) { bind(&SessionState::received, ref(session), bind(frame, _1))); } -// Make a string from a ReplayRange. -std::string replayChars(const SessionState::Replay& frames) { - string result(frames.size(), ' '); - transform(frames.begin(), frames.end(), result.begin(), - bind(&charFromFrame, _1)); - return result; -} - -namespace qpid { -namespace framing { - bool operator==(const AMQFrame& a, const AMQFrame& b) { const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody()); const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody()); return ab && bb && ab->getData() == bb->getData(); } -}} // namespace qpid::framing - - QPID_AUTO_TEST_CASE(testSent) { // Test that we send solicit-ack at the right interval. AMQContentBody f; @@ -101,21 +305,21 @@ QPID_AUTO_TEST_CASE(testReplay) { sent(session, "abc"); session.suspend(); session.resuming(); session.receivedAck(-1); - BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc"); + BOOST_CHECK_EQUAL(str(session.replay()), "abc"); // Replay with acks session.receivedAck(0); // ack a. session.suspend(); session.resuming(); session.receivedAck(1); // ack b. - BOOST_CHECK_EQUAL(replayChars(session.replay()), "c"); + BOOST_CHECK_EQUAL(str(session.replay()), "c"); // Replay after further frames. sent(session, "def"); session.suspend(); session.resuming(); session.receivedAck(3); - BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef"); + BOOST_CHECK_EQUAL(str(session.replay()), "ef"); // Bad ack, too high try { |