summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/SessionState.cpp287
1 files changed, 287 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp
new file mode 100644
index 0000000000..e5019604d2
--- /dev/null
+++ b/qpid/cpp/src/qpid/SessionState.cpp
@@ -0,0 +1,287 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/SessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+#include <numeric>
+
+namespace qpid {
+using framing::AMQFrame;
+using framing::NotImplementedException;
+using framing::InvalidArgumentException;
+using framing::IllegalStateException;
+using framing::ResourceLimitExceededException;
+using framing::InternalErrorException;
+using framing::FramingErrorException;
+
+namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND;
+}
+} // namespace
+
+SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), offset(o) {}
+
+// TODO aconway 2008-05-22: Do complete frame sequence validity check here,
+// currently duplicated betwen broker and client session impl.
+//
+void SessionPoint::advance(const AMQFrame& f) {
+ if (isControl(f)) return; // Ignore controls.
+ if (f.isFirstSegment() && f.isFirstFrame()) {
+ if (offset != 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command start frame."));
+ if (!isCommand(f))
+ throw FramingErrorException(
+ QPID_MSG("Command start frame has invalid type" << f.getBody()->type()));
+ if (f.isLastSegment() && f.isLastFrame())
+ ++command; // Single-frame command.
+ else
+ offset += f.encodedSize();
+ }
+ else { // continuation frame for partial command
+ if (offset == 0)
+ throw FramingErrorException(QPID_MSG("Unexpected command continuation frame."));
+ if (f.isLastSegment() && f.isLastFrame()) {
+ ++command;
+ offset = 0;
+ }
+ else {
+ // TODO aconway 2008-04-24: if we go to support for partial
+ // command replay, then it may be better to record the unframed
+ // data size in a command point rather than the framed size so
+ // that the relationship of fragment offsets to the replay
+ // list can be computed more easily.
+ //
+ offset += f.encodedSize();
+ }
+ }
+}
+
+bool SessionPoint::operator<(const SessionPoint& x) const {
+ return command < x.command || (command == x.command && offset < x.offset);
+}
+
+bool SessionPoint::operator==(const SessionPoint& x) const {
+ return command == x.command && offset == x.offset;
+}
+
+
+SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSinceKnownCompleted() {}
+
+SessionState::ReceiveState::ReceiveState() : bytesSinceKnownCompleted() {}
+
+uint32_t SessionState::getTimeout() const { return timeout; }
+void SessionState::setTimeout(uint32_t seconds) { timeout = seconds; }
+
+SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
+SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
+SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
+
+SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) {
+ if (expect < sender.replayPoint || sender.sendPoint < expect)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range."));
+ QPID_LOG(debug, getId() << ": sender expected point moved to " << expect);
+ ReplayList::iterator i = sender.replayList.begin();
+ SessionPoint p = sender.replayPoint;
+ while (i != sender.replayList.end() && p.command < expect.command)
+ p.advance(*i++);
+ assert(p.command == expect.command);
+ return boost::make_iterator_range(i, sender.replayList.end());
+}
+
+void SessionState::senderRecord(const AMQFrame& f) {
+ if (isControl(f)) return; // Ignore control frames.
+ QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody());
+
+ stateful = true;
+ if (timeout) sender.replayList.push_back(f);
+ sender.unflushedSize += f.encodedSize();
+ sender.bytesSinceKnownCompleted += f.encodedSize();
+ sender.replaySize += f.encodedSize();
+ sender.incomplete += sender.sendPoint.command;
+ sender.sendPoint.advance(f);
+ if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
+ throw ResourceLimitExceededException("Replay buffer exceeeded hard limit");
+}
+
+static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536;
+
+bool SessionState::senderNeedFlush() const {
+ return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit);
+}
+
+void SessionState::senderRecordFlush() {
+ sender.flushPoint = sender.sendPoint;
+ sender.unflushedSize = 0;
+}
+
+bool SessionState::senderNeedKnownCompleted() const {
+ return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordKnownCompleted() {
+ sender.bytesSinceKnownCompleted = 0;
+}
+
+void SessionState::senderConfirmed(const SessionPoint& confirmed) {
+ if (confirmed > sender.sendPoint)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint));
+ QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
+ ReplayList::iterator i = sender.replayList.begin();
+ while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
+ sender.replayPoint.advance(*i);
+ assert(sender.replayPoint <= sender.sendPoint);
+ sender.replaySize -= i->encodedSize();
+ if (sender.replayPoint > sender.flushPoint)
+ sender.unflushedSize -= i->encodedSize();
+ ++i;
+ }
+ if (sender.replayPoint > sender.flushPoint)
+ sender.flushPoint = sender.replayPoint;
+ sender.replayList.erase(sender.replayList.begin(), i);
+ assert(sender.replayPoint.offset == 0);
+}
+
+void SessionState::senderCompleted(const SequenceSet& commands) {
+ if (commands.empty()) return;
+ QPID_LOG(debug, getId() << ": sender marked completed: " << commands);
+ sender.incomplete -= commands;
+ // Completion implies confirmation but we don't handle out-of-order
+ // confirmation, so confirm up to the end of the first contiguous range of commands.
+ senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
+}
+
+void SessionState::receiverSetCommandPoint(const SessionPoint& point) {
+ if (hasState() && point > receiver.received)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range."));
+ QPID_LOG(debug, getId() << ": receiver command-point set to: " << point);
+ receiver.expected = point;
+ if (receiver.expected > receiver.received)
+ receiver.received = receiver.expected;
+}
+
+bool SessionState::receiverRecord(const AMQFrame& f) {
+ if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges
+ if (isControl(f)) return true; // Ignore control frames.
+ stateful = true;
+ receiver.expected.advance(f);
+ receiver.bytesSinceKnownCompleted += f.encodedSize();
+ bool firstTime = receiver.expected > receiver.received;
+ if (firstTime) {
+ receiver.received = receiver.expected;
+ receiver.incomplete += receiverGetCurrent();
+ }
+ QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody());
+ if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame.");
+ return firstTime;
+}
+
+void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ if (receiverTrackingDisabled) return; //Very nasty hack for push bridges
+ assert(receiver.incomplete.contains(command)); // Internal error to complete command twice.
+ SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
+ SequenceNumber last = command;
+ receiver.unknownCompleted.add(first, last);
+ receiver.incomplete.remove(first, last);
+ QPID_LOG(debug, getId() << ": receiver marked completed: " << command
+ << " incomplete: " << receiver.incomplete
+ << " unknown-completed: " << receiver.unknownCompleted);
+}
+
+void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
+ if (!commands.empty() && commands.back() > receiver.received.command)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands."));
+ receiver.bytesSinceKnownCompleted=0;
+ receiver.unknownCompleted -= commands;
+ QPID_LOG(debug, getId() << ": receiver known completed: " << commands << " unknown: " << receiver.unknownCompleted);
+}
+
+bool SessionState::receiverNeedKnownCompleted() const {
+ return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= config.replayFlushLimit);
+}
+
+const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
+const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
+const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
+const SequenceSet& SessionState::receiverGetIncomplete() const { return receiver.incomplete; }
+
+SequenceNumber SessionState::receiverGetCurrent() const {
+ SequenceNumber current = receiver.expected.command;
+ if (receiver.expected.offset == 0)
+ --current;
+ return current;
+}
+
+SessionState::Configuration::Configuration(size_t flush, size_t hard) :
+ replayFlushLimit(flush), replayHardLimit(hard) {}
+
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+ : id(i), timeout(0), config(c), stateful(false), receiverTrackingDisabled(false)
+{
+ QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
+}
+
+bool SessionState::hasState() const { return stateful; }
+
+SessionState::~SessionState() {}
+
+std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
+ return o << "(" << p.command.getValue() << "+" << p.offset << ")";
+}
+
+void SessionState::setState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete
+)
+{
+ sender.replayPoint = replayStart;
+ sender.flushPoint = sendCommandPoint;
+ sender.sendPoint = sendCommandPoint;
+ sender.unflushedSize = 0;
+ sender.replaySize = 0; // Replay list will be updated separately.
+ sender.incomplete = sentIncomplete;
+ sender.bytesSinceKnownCompleted = 0;
+
+ receiver.expected = expected;
+ receiver.received = received;
+ receiver.unknownCompleted = unknownCompleted;
+ receiver.incomplete = receivedIncomplete;
+ receiver.bytesSinceKnownCompleted = 0;
+}
+
+void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; }
+void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; }
+
+} // namespace qpid