diff options
author | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-04-27 18:32:26 +0000 |
commit | 9acf66d9c848809e3308e50998d38a0183b038a4 (patch) | |
tree | b8ca2a996000b38a71cbb098f171af9ae4c540cc /qpid/cpp/src | |
parent | fdb31574f9cdb3474b4984fb0776f02ea4e32433 (diff) | |
download | qpid-python-9acf66d9c848809e3308e50998d38a0183b038a4.tar.gz |
Session state as per AMQP 0-10 specification.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651997 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/SessionState.cpp | 165 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/SessionState.h | 188 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/SequenceSet.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/SessionState.cpp | 254 |
5 files changed, 586 insertions, 25 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index ce36a33933..a64f70abd8 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -138,6 +138,8 @@ libqpidcommon_la_SOURCES = \ $(rgen_framing_srcs) \ $(platform_src) \ qpid/Serializer.h \ + qpid/SessionState.cpp \ + qpid/SessionState.h \ qpid/framing/AccumulatedAck.cpp \ qpid/framing/AMQBody.cpp \ qpid/framing/AMQMethodBody.cpp \ diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp new file mode 100644 index 0000000000..64fdd17b8f --- /dev/null +++ b/qpid/cpp/src/qpid/SessionState.cpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// FIXME aconway 2008-04-24: Reminders for handler implementation. +// +// - execution.sync results must be communicated to SessionState::peerConfirmed. +// +// + +#include "SessionState.h" +#include "qpid/amqp_0_10/exceptions.h" +#include "qpid/framing/AMQMethodBody.h" +#include <boost/bind.hpp> +#include <numeric> + +namespace qpid { +using framing::AMQFrame; +using amqp_0_10::NotImplementedException; + +/** A point in the session - command id + offset */ +void SessionPoint::advance(const AMQFrame& f) { + if (f.isLastSegment() && f.isLastFrame()) { + ++command; + offset = 0; + } + else { + // TODO aconway 2008-04-24: if we go to support for partial + // command replay, then it may be better to record the unframed + // data size in a command point rather than the framed size so + // that the relationship of fragment offsets to the replay + // list can be computed more easily. + // + offset += f.size(); + } +} + +bool SessionPoint::operator<(const SessionPoint& x) const { + return command < x.command || (command == x.command && offset < x.offset); +} + +bool SessionPoint::operator==(const SessionPoint& x) const { + return command == x.command && offset == x.offset; +} + +SendState::SendState(size_t syncSize, size_t killSize) + : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {} + +void SendState::send(const AMQFrame& f) { + if (f.getMethod() && f.getMethod()->type() == 0) + return; // Don't replay control frames. + replayList.push_back(f); + unflushedSize += f.size(); + sendPoint.advance(f); +} + +bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; } + +void SendState::sendFlush() { + assert(flushPoint <= sendPoint); + flushPoint = sendPoint; + unflushedSize = 0; +} + +void SendState::peerConfirmed(const SessionPoint& confirmed) { + ReplayList::iterator i = replayList.begin(); + // Ignore peerConfirmed.offset, we don't support partial replay. + while (i != replayList.end() && replayPoint.command < confirmed.command) { + assert(replayPoint <= flushPoint); + replayPoint.advance(*i); + assert(replayPoint <= sendPoint); + if (replayPoint > flushPoint) { + flushPoint.advance(*i); + assert(replayPoint <= flushPoint); + unflushedSize -= i->size(); + } + ++i; + } + replayList.erase(replayList.begin(), i); + assert(replayPoint.offset == 0); +} + +void SendState::peerCompleted(const SequenceSet& commands) { + if (commands.empty()) return; + sentCompleted += commands; + // Completion implies confirmation but we don't handle out-of-order + // confirmation, so confirm only the first contiguous range of commands. + peerConfirmed(SessionPoint(commands.rangesBegin()->end())); +} + +bool ReceiveState::hasState() { return stateful; } + +void ReceiveState::setExpecting(const SessionPoint& point) { + if (!hasState()) // initializing a new session. + expecting = received = point; + else { // setting point in an existing session. + if (point > received) + throw NotImplementedException("command-point out of bounds."); + expecting = point; + } +} + +ReceiveState::ReceiveState() : stateful() {} + +bool ReceiveState::receive(const AMQFrame& f) { + stateful = true; + expecting.advance(f); + if (expecting > received) { + received = expecting; + return true; + } + return false; +} + +void ReceiveState::localCompleted(SequenceNumber command) { + assert(command < received.command); // Can't complete what we haven't received. + receivedCompleted += command; +} + +void ReceiveState::peerKnownComplete(const SequenceSet& commands) { + receivedCompleted -= commands; +} + +SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} + +bool SessionId::operator<(const SessionId& id) const { + return userId < id.userId || (userId == id.userId && name < id.name); +} + +bool SessionId::operator==(const SessionId& id) const { + return id.name == name && id.userId == userId; +} + +SessionState::Configuration::Configuration() + : replaySyncSize(std::numeric_limits<size_t>::max()), + replayKillSize(std::numeric_limits<size_t>::max()) {} + +SessionState::SessionState(const SessionId& i, const Configuration& c) + : SendState(c.replaySyncSize, c.replayKillSize), + id(i), timeout(), config(c) {} + +void SessionState::clear() { *this = SessionState(id, config); } + +std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { + return o << "(" << p.command.getValue() << "+" << p.offset << ")"; +} + +} // namespace qpid diff --git a/qpid/cpp/src/qpid/SessionState.h b/qpid/cpp/src/qpid/SessionState.h new file mode 100644 index 0000000000..b836534ee7 --- /dev/null +++ b/qpid/cpp/src/qpid/SessionState.h @@ -0,0 +1,188 @@ +#ifndef QPID_SESSIONSTATE_H +#define QPID_SESSIONSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/SequenceNumber.h> +#include <qpid/framing/SequenceSet.h> +#include <qpid/framing/AMQFrame.h> +#include <boost/operators.hpp> +#include <vector> +#include <iosfwd> + +namespace qpid { +using framing::SequenceNumber; +using framing::SequenceSet; + +/** A point in the session. Points to command id + offset */ +struct SessionPoint : boost::totally_ordered1<SessionPoint> { + SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {} + + SequenceNumber command; + uint64_t offset; + + /** Advance past frame f */ + void advance(const framing::AMQFrame& f); + + bool operator<(const SessionPoint&) const; + bool operator==(const SessionPoint&) const; +}; + +std::ostream& operator<<(std::ostream&, const SessionPoint&); + +/** The sending half of session state */ +class SendState { + public: + typedef std::vector<framing::AMQFrame> ReplayList; + + /** Record frame f for replay. Should not be called during replay. */ + void send(const framing::AMQFrame& f); + + /** @return true if we should send flush for confirmed and completed commands. */ + bool needFlush() const; + + /** Called when flush for confirmed and completed commands is sent to peer. */ + void sendFlush(); + + /** Called when the peer confirms up to commands. */ + void peerConfirmed(const SessionPoint& confirmed); + + /** Called when the peer indicates commands completed */ + void peerCompleted(const SequenceSet& commands); + + /** Get the replay list. @see getReplayPoint. */ + const ReplayList& getReplayList() const { return replayList; } + + /** + * The replay point is the point up to which all data has been + * confirmed. Partial replay is not supported, it will always + * have offset==0. + */ + const SessionPoint& getReplayPoint() const { return replayPoint; } + + const SessionPoint& getSendPoint() const { return sendPoint; } + const SequenceSet& getCompleted() const { return sentCompleted; } + + protected: + SendState(size_t replaySyncSize, size_t replayKillSize); + + private: + size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration. + // invariant: replayPoint <= flushPoint <= sendPoint + SessionPoint replayPoint; // Can replay from this point + SessionPoint sendPoint; // Send from this point + SessionPoint flushPoint; // Point of last flush + ReplayList replayList; // Starts from replayPoint. + size_t unflushedSize; // Un-flushed bytes in replay list. + SequenceSet sentCompleted; // Commands sent and acknowledged as completed. +}; + +/** Receiving half of SessionState */ +class ReceiveState { + public: + bool hasState(); + + /** Set the command point. */ + void setExpecting(const SessionPoint& point); + + /** Returns true if frame should be be processed, false if it is a duplicate. */ + bool receive(const framing::AMQFrame& f); + + /** Command completed locally */ + void localCompleted(SequenceNumber command); + + /** Peer has indicated commands are known completed */ + void peerKnownComplete(const SequenceSet& commands); + + /** Recieved, completed and possibly not known by peer to be completed */ + const SequenceSet& getReceivedCompleted() const { return receivedCompleted; } + const SessionPoint& getExpecting() const { return expecting; } + const SessionPoint& getReceived() const { return received; } + + protected: + ReceiveState(); + + private: + bool stateful; // True if session has state. + SessionPoint expecting; // Expecting from here + SessionPoint received; // Received to here. Invariant: expecting <= received. + SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer +}; + +/** Identifier for a session */ +class SessionId : boost::totally_ordered1<SessionId> { + std::string userId; + std::string name; + public: + SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); + std::string getUserId() const { return userId; } + std::string getName() const { return name; } + bool operator<(const SessionId&) const ; + bool operator==(const SessionId& id) const; +}; + + +/** + * Support for session idempotence barrier and resume as defined in + * AMQP 0-10. + * + * We only issue/use contiguous confirmations, out-of-order confirmation + * is ignored. Out of order completion is fully supported. + * + * Raises NotImplemented if the command point is set greater than the + * max currently received command data, either explicitly via + * session.command-point or implicitly via session.gap. + * + * Partial replay is not supported, replay always begins on a command + * boundary, and we never confirm partial commands. + * + * The SessionPoint data structure does store offsets so this class + * could be extended to support partial replay without + * source-incompatbile API changes. + */ +class SessionState : public SendState, public ReceiveState { + public: + struct Configuration { + Configuration(); + size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes + size_t replayKillSize; // Kill session if replay list grows beyond N bytes. + }; + + SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + + const SessionId& getId() const { return id; } + uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t seconds) { timeout = seconds; } + + /** Clear all state except Id. */ + void clear(); + + private: + SessionId id; + uint32_t timeout; + Configuration config; +}; + +} // namespace qpid + + +#endif /*!QPID_SESSIONSTATE_H*/ diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h index f934bb40bb..029a26818e 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.h +++ b/qpid/cpp/src/qpid/framing/SequenceSet.h @@ -31,6 +31,8 @@ class Buffer; class SequenceSet : public RangeSet<SequenceNumber> { public: SequenceSet() {} + explicit SequenceSet(const RangeSet<SequenceNumber>& r) + : RangeSet<SequenceNumber>(r) {} explicit SequenceSet(const SequenceNumber& s) { add(s); } void encode(Buffer& buffer) const; diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp index 318bfbbddd..752d6d3e75 100644 --- a/qpid/cpp/src/tests/SessionState.cpp +++ b/qpid/cpp/src/tests/SessionState.cpp @@ -16,17 +16,33 @@ * */ -#include "qpid/framing/SessionState.h" +#include "unit_test.h" + +#include "qpid/framing/SessionState.h" // FIXME aconway 2008-04-23: preview code to remove. +#include "qpid/SessionState.h" #include "qpid/Exception.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SessionFlushBody.h" #include <boost/bind.hpp> -#include "unit_test.h" +#include <algorithm> +#include <functional> +#include <numeric> QPID_AUTO_TEST_SUITE(SessionStateTestSuite) using namespace std; -using namespace qpid::framing; using namespace boost; +using namespace qpid::framing; + +// ================================================================ +// Utility functions. + +// Apply f to [begin, end) and accumulate the result +template <class Iter, class T, class F> +T applyAccumulate(Iter begin, Iter end, T seed, const F& f) { + return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2))); +} // Create a frame with a one-char string. AMQFrame& frame(char s) { @@ -35,13 +51,215 @@ AMQFrame& frame(char s) { return frame; } -// Extract the one-char string from a frame. -char charFromFrame(const AMQFrame& f) { - const AMQContentBody* b=dynamic_cast<const AMQContentBody*>(f.getBody()); - BOOST_REQUIRE(b && b->getData().size() > 0); - return b->getData()[0]; +// Simple string representation of a frame. +string str(const AMQFrame& f) { + if (f.getMethod()) return "C"; // Command or Control + const AMQContentBody* c = dynamic_cast<const AMQContentBody*>(f.getBody()); + if (c) return c->getData(); // Return data for content frames. + return "H"; // Must be a header. +} +// Make a string from a range of frames. +string str(const vector<AMQFrame>& frames) { + string (*strFrame)(const AMQFrame&) = str; + return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame)); +} +// Make a transfer command frame. +AMQFrame transferFrame(bool hasContent) { + AMQFrame t(in_place<MessageTransferBody>()); + t.setFirstFrame(); + t.setLastFrame(); + t.setFirstSegment(); + t.setLastSegment(!hasContent); + return t; +} +// Make a content frame +AMQFrame contentFrame(string content, bool isLast=true) { + AMQFrame f(in_place<AMQContentBody>(content)); + f.setFirstFrame(); + f.setLastFrame(); + f.setLastSegment(isLast); + return f; +} +AMQFrame contentFrameChar(char content, bool isLast=true) { + return contentFrame(string(1, content), isLast); +} + +// Send frame & return size of frame. +size_t send(qpid::SessionState& s, const AMQFrame& f) { s.send(f); return f.size(); } +// Send transfer command with no content. +size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); } +// Send transfer frame with single content frame. +size_t transfer1(qpid::SessionState& s, string content) { + return send(s,transferFrame(true)) + send(s,contentFrame(content)); +} +size_t transfer1Char(qpid::SessionState& s, char content) { + return transfer1(s, string(1,content)); +} + +// Send transfer frame with multiple single-byte content frames. +size_t transferN(qpid::SessionState& s, string content) { + size_t size=send(s, transferFrame(!content.empty())); + if (!content.empty()) { + char last = content[content.size()-1]; + content.resize(content.size()-1); + size += applyAccumulate(content.begin(), content.end(), 0, + bind(&send, ref(s), + bind(contentFrameChar, _1, false))); + size += send(s, contentFrameChar(last, true)); + } + return size; +} + +// Send multiple transfers with single-byte content. +size_t transfers(qpid::SessionState& s, string content) { + return applyAccumulate(content.begin(), content.end(), 0, + bind(transfer1Char, ref(s), _1)); +} + +size_t contentFrameSize(size_t n=1) { return AMQFrame(in_place<AMQContentBody>()).size() + n; } +size_t transferFrameSize() { return AMQFrame(in_place<MessageTransferBody>()).size(); } + +// ==== qpid::SessionState test classes + +using qpid::SessionId; +using qpid::SessionPoint; + + +QPID_AUTO_TEST_CASE(testSendGetReplyList) { + qpid::SessionState s; + transfer1(s, "abc"); + transfers(s, "def"); + transferN(s, "xyz"); + BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); + // Ignore controls. + s.send(AMQFrame(in_place<SessionFlushBody>())); + BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); +} + +QPID_AUTO_TEST_CASE(testNeedFlush) { + qpid::SessionState::Configuration c; + // sync after 2 1-byte transfers or equivalent bytes. + c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + qpid::SessionState s(SessionId(), c); + transfers(s, "a"); + BOOST_CHECK(!s.needFlush()); + transfers(s, "b"); + BOOST_CHECK(s.needFlush()); + s.sendFlush(); + BOOST_CHECK(!s.needFlush()); + transfers(s, "c"); + BOOST_CHECK(!s.needFlush()); + transfers(s, "d"); + BOOST_CHECK(s.needFlush()); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); } +QPID_AUTO_TEST_CASE(testPeerConfirmed) { + qpid::SessionState::Configuration c; + // sync after 2 1-byte transfers or equivalent bytes. + c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + qpid::SessionState s(SessionId(), c); + transfers(s, "ab"); + BOOST_CHECK(s.needFlush()); + transfers(s, "cd"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); + s.peerConfirmed(SessionPoint(3)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cd"); + BOOST_CHECK(!s.needFlush()); + + // Never go backwards. + s.peerConfirmed(SessionPoint(2)); + s.peerConfirmed(SessionPoint(3)); + + // Multi-frame transfer. + transfer1(s, "efg"); + transfers(s, "xy"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CdCefgCxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(4)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CefgCxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(5)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CxCy"); + BOOST_CHECK(s.needFlush()); + + s.peerConfirmed(SessionPoint(6)); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cy"); + BOOST_CHECK(!s.needFlush()); +} + +QPID_AUTO_TEST_CASE(testPeerCompleted) { + qpid::SessionState s; + // Completion implies confirmation + transfers(s, "abc"); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCc"); + SequenceSet set(SequenceSet() + 0 + 1); + s.peerCompleted(set); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cc"); + + transfers(s, "def"); + // We dont do out-of-order confirmation, so this will only confirm up to 3: + set = SequenceSet(SequenceSet() + 2 + 3 + 5); + s.peerCompleted(set); + BOOST_CHECK_EQUAL(str(s.getReplayList()), "CeCf"); +} + +QPID_AUTO_TEST_CASE(testReceive) { + // Advance expecting/received correctly + qpid::SessionState s; + BOOST_CHECK(!s.hasState()); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(0)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(0)); + + BOOST_CHECK(s.receive(transferFrame(false))); + BOOST_CHECK(s.hasState()); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(1)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(1)); + + BOOST_CHECK(s.receive(transferFrame(true))); + SessionPoint point = SessionPoint(1, transferFrameSize()); + BOOST_CHECK_EQUAL(s.getExpecting(), point); + BOOST_CHECK_EQUAL(s.getReceived(), point); + BOOST_CHECK(s.receive(contentFrame("", false))); + point.offset += contentFrameSize(0); + BOOST_CHECK_EQUAL(s.getExpecting(), point); + BOOST_CHECK_EQUAL(s.getReceived(), point); + BOOST_CHECK(s.receive(contentFrame("", true))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); + + // Idempotence barrier, rewind expecting & receive some duplicates. + s.setExpecting(SessionPoint(1)); + BOOST_CHECK(!s.receive(transferFrame(false))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); + BOOST_CHECK(s.receive(transferFrame(false))); + BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(3)); + BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(3)); +} + +QPID_AUTO_TEST_CASE(testCompleted) { + // completed & unknownCompleted + qpid::SessionState s; + s.receive(transferFrame(false)); + s.receive(transferFrame(false)); + s.receive(transferFrame(false)); + s.localCompleted(1); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+1)); + s.localCompleted(0); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), + SequenceSet(SequenceSet() + SequenceSet::Range(0,2))); + s.peerKnownComplete(SequenceSet(SequenceSet()+1)); + BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+2)); +} + +// ================================================================ +// FIXME aconway 2008-04-23: Below here is old preview framing::SessionState test, remove with preview code. + +using namespace qpid::framing; + // Sent chars as frames void sent(SessionState& session, const std::string& frames) { for_each(frames.begin(), frames.end(), @@ -54,26 +272,12 @@ void received(SessionState& session, const std::string& frames) { bind(&SessionState::received, ref(session), bind(frame, _1))); } -// Make a string from a ReplayRange. -std::string replayChars(const SessionState::Replay& frames) { - string result(frames.size(), ' '); - transform(frames.begin(), frames.end(), result.begin(), - bind(&charFromFrame, _1)); - return result; -} - -namespace qpid { -namespace framing { - bool operator==(const AMQFrame& a, const AMQFrame& b) { const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody()); const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody()); return ab && bb && ab->getData() == bb->getData(); } -}} // namespace qpid::framing - - QPID_AUTO_TEST_CASE(testSent) { // Test that we send solicit-ack at the right interval. AMQContentBody f; @@ -101,21 +305,21 @@ QPID_AUTO_TEST_CASE(testReplay) { sent(session, "abc"); session.suspend(); session.resuming(); session.receivedAck(-1); - BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc"); + BOOST_CHECK_EQUAL(str(session.replay()), "abc"); // Replay with acks session.receivedAck(0); // ack a. session.suspend(); session.resuming(); session.receivedAck(1); // ack b. - BOOST_CHECK_EQUAL(replayChars(session.replay()), "c"); + BOOST_CHECK_EQUAL(str(session.replay()), "c"); // Replay after further frames. sent(session, "def"); session.suspend(); session.resuming(); session.receivedAck(3); - BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef"); + BOOST_CHECK_EQUAL(str(session.replay()), "ef"); // Bad ack, too high try { |