diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/Exception.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/Exception.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/ExceptionHolder.h | 73 | ||||
-rw-r--r-- | cpp/src/qpid/SessionId.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/SessionId.h | 49 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 131 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 163 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 275 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.h | 116 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Proxy.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Proxy.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceSet.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceSet.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.h | 2 |
15 files changed, 763 insertions, 145 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index a69955c9dc..8176d92cac 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -34,13 +34,15 @@ std::string strError(int err) { } Exception::Exception(const std::string& msg) throw() : message(msg) { - QPID_LOG(debug, "Exception thrown: " << message); + QPID_LOG(debug, "Exception: " << message); } Exception::~Exception() throw() {} std::string Exception::getPrefix() const { return "Exception"; } +std::string Exception::getMessage() const { return message; } + const char* Exception::what() const throw() { if (whatStr.empty()) whatStr = getPrefix() + ": " + message; diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index e74fa79ed9..1be433f17a 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -43,10 +43,10 @@ class Exception : public std::exception public: explicit Exception(const std::string& message=std::string()) throw(); virtual ~Exception() throw(); - virtual const char* what() const throw(); + virtual const char* what() const throw(); // prefix: message + virtual std::string getMessage() const; // Unprefixed message + virtual std::string getPrefix() const; // Prefix - protected: - std::string getPrefix() const; private: std::string message; mutable std::string whatStr; diff --git a/cpp/src/qpid/ExceptionHolder.h b/cpp/src/qpid/ExceptionHolder.h new file mode 100644 index 0000000000..fed6308f19 --- /dev/null +++ b/cpp/src/qpid/ExceptionHolder.h @@ -0,0 +1,73 @@ +#ifndef QPID_EXCEPTIONHOLDER_H +#define QPID_EXCEPTIONHOLDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/memory.h" +#include <memory> + +namespace qpid { + +struct Raisable { + virtual ~Raisable() {}; + virtual void raise() const=0; + virtual std::string what() const=0; +}; + +/** + * Holder for exceptions. Allows the thread that notices an error condition to + * create an exception and store it to be thrown by another thread. + */ +class ExceptionHolder : public Raisable { + public: + ExceptionHolder() {} + ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {} + /** Take ownership of ex */ + template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); } + template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); } + + ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; } + template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } + template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; } + + void raise() const { if (wrapper.get()) wrapper->raise() ; } + std::string what() const { return wrapper->what(); } + bool empty() const { return !wrapper.get(); } + operator bool() const { return !empty(); } + void reset() { wrapper.reset(); } + + private: + template <class Ex> struct Wrapper : public Raisable { + Wrapper(Ex* ptr) : exception(ptr) {} + void raise() const { throw *exception; } + std::string what() const { return exception->what(); } + std::auto_ptr<Ex> exception; + }; + template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); } + std::auto_ptr<Raisable> wrapper; + +}; + + +} // namespace qpid + +#endif /*!QPID_EXCEPTIONHOLDER_H*/ diff --git a/cpp/src/qpid/SessionId.cpp b/cpp/src/qpid/SessionId.cpp new file mode 100644 index 0000000000..fce6619f5d --- /dev/null +++ b/cpp/src/qpid/SessionId.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionId.h" +#include <sstream> + +namespace qpid { + +SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} + +bool SessionId::operator<(const SessionId& id) const { + return userId < id.userId || (userId == id.userId && name < id.name); +} + +bool SessionId::operator==(const SessionId& id) const { + return id.name == name && id.userId == userId; +} + +std::ostream& operator<<(std::ostream& o, const SessionId& id) { + return o << id.getName() << "@" << id.getUserId(); +} + +std::string SessionId::str() const { + std::ostringstream o; + o << *this; + return o.str(); +} + +} // namespace qpid diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h new file mode 100644 index 0000000000..08553e8b1d --- /dev/null +++ b/cpp/src/qpid/SessionId.h @@ -0,0 +1,49 @@ +#ifndef QPID_SESSIONID_H +#define QPID_SESSIONID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/operators.hpp> +#include <string> + +namespace qpid { + +/** Identifier for a session */ +class SessionId : boost::totally_ordered1<SessionId> { + std::string userId; + std::string name; + public: + SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); + std::string getUserId() const { return userId; } + std::string getName() const { return name; } + bool operator<(const SessionId&) const ; + bool operator==(const SessionId& id) const; + // Convert to a string + std::string str() const; +}; + +std::ostream& operator<<(std::ostream&, const SessionId&); + + +} // namespace qpid + +#endif /*!QPID_SESSIONID_H*/ diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 64fdd17b8f..8905fb5f9d 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -19,21 +19,24 @@ * */ -// FIXME aconway 2008-04-24: Reminders for handler implementation. -// -// - execution.sync results must be communicated to SessionState::peerConfirmed. -// -// - #include "SessionState.h" #include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <numeric> namespace qpid { using framing::AMQFrame; using amqp_0_10::NotImplementedException; +using amqp_0_10::InvalidArgumentException; +using amqp_0_10::IllegalStateException; + +namespace { +bool isControl(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == 0; +} +} // namespace /** A point in the session - command id + offset */ void SessionPoint::advance(const AMQFrame& f) { @@ -60,103 +63,123 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } -SendState::SendState(size_t syncSize, size_t killSize) - : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {} +SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} + +const SessionPoint& SessionState::SendState::getCommandPoint() { + return sendPoint; +} + +bool SessionState::SendState::expected(const SessionPoint& point) { + if (point < replayPoint || sendPoint < point) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); + // FIXME aconway 2008-05-06: this is not strictly correct, we should keep + // an intermediate replay pointer into the replay list. + confirmed(point); // Drop commands prior to expected from replay. + return (!replayList.empty()); +} -void SendState::send(const AMQFrame& f) { - if (f.getMethod() && f.getMethod()->type() == 0) - return; // Don't replay control frames. +void SessionState::SendState::record(const AMQFrame& f) { + if (isControl(f)) return; // Ignore control frames. + session->stateful = true; replayList.push_back(f); unflushedSize += f.size(); + incomplete += sendPoint.command; sendPoint.advance(f); } -bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; } +bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } -void SendState::sendFlush() { +void SessionState::SendState::recordFlush() { assert(flushPoint <= sendPoint); flushPoint = sendPoint; unflushedSize = 0; } -void SendState::peerConfirmed(const SessionPoint& confirmed) { +void SessionState::SendState::confirmed(const SessionPoint& confirmed) { + if (confirmed > sendPoint) + throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); ReplayList::iterator i = replayList.begin(); - // Ignore peerConfirmed.offset, we don't support partial replay. while (i != replayList.end() && replayPoint.command < confirmed.command) { - assert(replayPoint <= flushPoint); replayPoint.advance(*i); assert(replayPoint <= sendPoint); - if (replayPoint > flushPoint) { - flushPoint.advance(*i); - assert(replayPoint <= flushPoint); + if (replayPoint > flushPoint) unflushedSize -= i->size(); - } ++i; } + if (replayPoint > flushPoint) flushPoint = replayPoint; replayList.erase(replayList.begin(), i); assert(replayPoint.offset == 0); } -void SendState::peerCompleted(const SequenceSet& commands) { +void SessionState::SendState::completed(const SequenceSet& commands) { if (commands.empty()) return; - sentCompleted += commands; + incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order // confirmation, so confirm only the first contiguous range of commands. - peerConfirmed(SessionPoint(commands.rangesBegin()->end())); + confirmed(SessionPoint(commands.rangesBegin()->end())); } -bool ReceiveState::hasState() { return stateful; } +SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} -void ReceiveState::setExpecting(const SessionPoint& point) { - if (!hasState()) // initializing a new session. - expecting = received = point; - else { // setting point in an existing session. - if (point > received) - throw NotImplementedException("command-point out of bounds."); - expecting = point; - } +void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { + if (session->hasState() && point > received) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); + expected = point; + if (expected > received) + received = expected; } -ReceiveState::ReceiveState() : stateful() {} - -bool ReceiveState::receive(const AMQFrame& f) { - stateful = true; - expecting.advance(f); - if (expecting > received) { - received = expecting; +bool SessionState::ReceiveState::record(const AMQFrame& f) { + if (isControl(f)) return true; // Ignore control frames. + session->stateful = true; + expected.advance(f); + if (expected > received) { + received = expected; return true; } + else { + QPID_LOG(debug, "Ignoring duplicate: " << f); return false; } - -void ReceiveState::localCompleted(SequenceNumber command) { - assert(command < received.command); // Can't complete what we haven't received. - receivedCompleted += command; } -void ReceiveState::peerKnownComplete(const SequenceSet& commands) { - receivedCompleted -= commands; +void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { + assert(command <= received.command); // Internal error to complete an unreceived command. + assert(firstIncomplete <= command); + if (cumulative) + unknownCompleted.add(firstIncomplete, command); + else + unknownCompleted += command; + firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); } -SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} - -bool SessionId::operator<(const SessionId& id) const { - return userId < id.userId || (userId == id.userId && name < id.name); +void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { + if (!commands.empty() && commands.back() > received.command) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); + unknownCompleted -= commands; } -bool SessionId::operator==(const SessionId& id) const { - return id.name == name && id.userId == userId; +SequenceNumber SessionState::ReceiveState::getCurrent() const { + SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. + return --current; } +// FIXME aconway 2008-05-02: implement sync & kill limits. SessionState::Configuration::Configuration() : replaySyncSize(std::numeric_limits<size_t>::max()), replayKillSize(std::numeric_limits<size_t>::max()) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : SendState(c.replaySyncSize, c.replayKillSize), - id(i), timeout(), config(c) {} + : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +{ + QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); +} + +bool SessionState::hasState() const { + return stateful; +} -void SessionState::clear() { *this = SessionState(id, config); } +SessionState::~SessionState() {} std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { return o << "(" << p.command.getValue() << "+" << p.offset << ")"; diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index b836534ee7..7957825dd3 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -22,9 +22,11 @@ * */ +#include <qpid/SessionId.h> #include <qpid/framing/SequenceNumber.h> #include <qpid/framing/SequenceSet.h> #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FrameHandler.h> #include <boost/operators.hpp> #include <vector> #include <iosfwd> @@ -49,118 +51,124 @@ struct SessionPoint : boost::totally_ordered1<SessionPoint> { std::ostream& operator<<(std::ostream&, const SessionPoint&); -/** The sending half of session state */ +/** + * Support for session idempotence barrier and resume as defined in + * AMQP 0-10. + * + * We only issue/use contiguous confirmations, out-of-order confirmation + * is ignored. Out of order completion is fully supported. + * + * Raises NotImplemented if the command point is set greater than the + * max currently received command data, either explicitly via + * session.command-point or implicitly via session.gap. + * + * Partial replay is not supported, replay always begins on a command + * boundary, and we never confirm partial commands. + * + * The SessionPoint data structure does store offsets so this class + * could be extended to support partial replay without + * source-incompatbile API changes. + */ +class SessionState { + public: + + /** State for commands sent. Records commands for replay, + * tracks confirmation and completion of sent commands. + */ class SendState { public: typedef std::vector<framing::AMQFrame> ReplayList; /** Record frame f for replay. Should not be called during replay. */ - void send(const framing::AMQFrame& f); + void record(const framing::AMQFrame& f); /** @return true if we should send flush for confirmed and completed commands. */ bool needFlush() const; /** Called when flush for confirmed and completed commands is sent to peer. */ - void sendFlush(); + void recordFlush(); - /** Called when the peer confirms up to commands. */ - void peerConfirmed(const SessionPoint& confirmed); + /** Called when the peer confirms up to comfirmed. */ + void confirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ - void peerCompleted(const SequenceSet& commands); + void completed(const SequenceSet& commands); - /** Get the replay list. @see getReplayPoint. */ - const ReplayList& getReplayList() const { return replayList; } - - /** - * The replay point is the point up to which all data has been - * confirmed. Partial replay is not supported, it will always - * have offset==0. - */ + /** Point from which we can replay. All data < replayPoint is confirmed. */ const SessionPoint& getReplayPoint() const { return replayPoint; } - const SessionPoint& getSendPoint() const { return sendPoint; } - const SequenceSet& getCompleted() const { return sentCompleted; } + /** Get the replay list, starting from getReplayPoint() */ + // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. + ReplayList& getReplayList() { return replayList; } - protected: - SendState(size_t replaySyncSize, size_t replayKillSize); + /** Point from which the next data will be sent. */ + const SessionPoint& getCommandPoint(); + + /** Set of outstanding incomplete commands */ + const SequenceSet& getIncomplete() const { return incomplete; } + + /** Peer expecting commands from this point. + *@return true if replay is required, sets replayPoint. + */ + bool expected(const SessionPoint& expected); private: - size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration. + SendState(SessionState& s); + + SessionState* session; // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point - SessionPoint sendPoint; // Send from this point SessionPoint flushPoint; // Point of last flush + SessionPoint sendPoint; // Send from this point ReplayList replayList; // Starts from replayPoint. size_t unflushedSize; // Un-flushed bytes in replay list. - SequenceSet sentCompleted; // Commands sent and acknowledged as completed. + SequenceSet incomplete; // Commands sent and not yet completed. + + friend class SessionState; }; -/** Receiving half of SessionState */ + /** State for commands received. + * Idempotence barrier for duplicate commands, tracks completion + * and of received commands. + */ class ReceiveState { public: - bool hasState(); - /** Set the command point. */ - void setExpecting(const SessionPoint& point); + void setCommandPoint(const SessionPoint& point); /** Returns true if frame should be be processed, false if it is a duplicate. */ - bool receive(const framing::AMQFrame& f); + bool record(const framing::AMQFrame& f); /** Command completed locally */ - void localCompleted(SequenceNumber command); + void completed(SequenceNumber command, bool cumulative=false); /** Peer has indicated commands are known completed */ - void peerKnownComplete(const SequenceSet& commands); + void knownCompleted(const SequenceSet& commands); + + /** Get the incoming command point */ + const SessionPoint& getExpected() const { return expected; } - /** Recieved, completed and possibly not known by peer to be completed */ - const SequenceSet& getReceivedCompleted() const { return receivedCompleted; } - const SessionPoint& getExpecting() const { return expecting; } + /** Get the received high-water-mark, may be > getExpected() during replay */ const SessionPoint& getReceived() const { return received; } - protected: - ReceiveState(); + /** Completed commands that the peer may not know about */ + const SequenceSet& getUnknownComplete() const { return unknownCompleted; } + + /** ID of the command currently being handled. */ + SequenceNumber getCurrent() const; private: - bool stateful; // True if session has state. - SessionPoint expecting; // Expecting from here - SessionPoint received; // Received to here. Invariant: expecting <= received. - SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer -}; + ReceiveState(SessionState&); -/** Identifier for a session */ -class SessionId : boost::totally_ordered1<SessionId> { - std::string userId; - std::string name; - public: - SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); - std::string getUserId() const { return userId; } - std::string getName() const { return name; } - bool operator<(const SessionId&) const ; - bool operator==(const SessionId& id) const; -}; + SessionState* session; + SessionPoint expected; // Expected from here + SessionPoint received; // Received to here. Invariant: expected <= received. + SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. + SequenceNumber firstIncomplete; // First incomplete command. + friend class SessionState; + }; -/** - * Support for session idempotence barrier and resume as defined in - * AMQP 0-10. - * - * We only issue/use contiguous confirmations, out-of-order confirmation - * is ignored. Out of order completion is fully supported. - * - * Raises NotImplemented if the command point is set greater than the - * max currently received command data, either explicitly via - * session.command-point or implicitly via session.gap. - * - * Partial replay is not supported, replay always begins on a command - * boundary, and we never confirm partial commands. - * - * The SessionPoint data structure does store offsets so this class - * could be extended to support partial replay without - * source-incompatbile API changes. - */ -class SessionState : public SendState, public ReceiveState { - public: struct Configuration { Configuration(); size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes @@ -169,19 +177,32 @@ class SessionState : public SendState, public ReceiveState { SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + virtual ~SessionState(); + const SessionId& getId() const { return id; } uint32_t getTimeout() const { return timeout; } void setTimeout(uint32_t seconds) { timeout = seconds; } - /** Clear all state except Id. */ - void clear(); + bool operator==(const SessionId& other) const { return id == other; } + bool operator==(const SessionState& other) const { return id == other.id; } + + SendState sender; ///< State for commands sent + ReceiveState receiver; ///< State for commands received + + bool hasState() const; private: SessionId id; uint32_t timeout; Configuration config; + bool stateful; + + friend class SendState; + friend class ReceiveState; }; +inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } + } // namespace qpid diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp new file mode 100644 index 0000000000..3fb2579e8c --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "SessionHandler.h" +#include "qpid/SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/log/Statement.h" + + +#include <boost/bind.hpp> + +namespace qpid { +namespace amqp_0_10 { +using namespace framing; +using namespace std; + +SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {} + +SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch) + : channel(ch, &out), peer(channel), ignoring(false) {} + +SessionHandler::~SessionHandler() {} + +namespace { +bool isSessionControl(AMQMethodBody* m) { + return m && + m->amqpClassId() == SESSION_CLASS_ID; +} +bool isSessionDetachedControl(AMQMethodBody* m) { + return isSessionControl(m) && + m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; +} +} // namespace + +void SessionHandler::checkAttached() { + if (!getState()) + throw NotAttachedException( + QPID_MSG("Channel " << channel.get() << " is not attached")); + assert(getInHandler()); + assert(channel.next); +} + +void SessionHandler::invoke(const AMQMethodBody& m) { + framing::invoke(*this, m); +} + +void SessionHandler::handleIn(AMQFrame& f) { + // Note on channel states: a channel is attached if session != 0 + AMQMethodBody* m = f.getBody()->getMethod(); + try { + if (ignoring && !isSessionDetachedControl(m)) + return; + else if (isSessionControl(m)) + invoke(*m); + else { + checkAttached(); + if (!receiveReady) + throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); + if (!getState()->receiver.record(f)) + return; // Ignore duplicates. + getInHandler()->handle(f); + } + } + catch(const ChannelException& e){ + QPID_LOG(error, "Channel exception: " << e.what()); + if (getState()) + peer.detached(getState()->getId().getName(), e.code); + channelException(e.code, e.getMessage()); + } + catch(const ConnectionException& e) { + QPID_LOG(error, "Connection exception: " << e.what()); + connectionException(e.code, e.getMessage()); + } + catch(const std::exception& e) { + QPID_LOG(error, "Unexpected exception: " << e.what()); + connectionException(connection::FRAMING_ERROR, e.what()); + } +} + +void SessionHandler::handleOut(AMQFrame& f) { + checkAttached(); + if (!sendReady) + throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); + getState()->sender.record(f); + if (getState()->sender.needFlush()) { + peer.flush(false, true, true); + getState()->sender.recordFlush(); + } + channel.handle(f); +} + +void SessionHandler::checkName(const std::string& name) { + checkAttached(); + if (name != getState()->getId().getName()) + throw InvalidArgumentException( + QPID_MSG("Incorrect session name: " << name + << ", expecting: " << getState()->getId().getName())); +} + +void SessionHandler::attach(const std::string& name, bool force) { + if (getState() && name == getState()->getId().getName()) + return; // Idempotent + if (getState()) + throw SessionBusyException( + QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); + setState(name, force); + QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); + peer.attached(name); + if (getState()->hasState()) + peer.flush(true, true, true); + else + sendCommandPoint(); +} + +void SessionHandler::attached(const std::string& name) { + checkName(name); +} + +void SessionHandler::detach(const std::string& name) { + checkName(name); + peer.detached(name, session::NORMAL); + handleDetach(); +} + +void SessionHandler::detached(const std::string& name, uint8_t code) { + checkName(name); + ignoring = false; + if (code != session::NORMAL) + channelException(code, "session.detached from peer."); + else { + handleDetach(); + } +} + +void SessionHandler::handleDetach() { + sendReady = receiveReady = false; +} + +void SessionHandler::requestTimeout(uint32_t t) { + checkAttached(); + getState()->setTimeout(t); + peer.timeout(t); +} + +void SessionHandler::timeout(uint32_t t) { + checkAttached(); + getState()->setTimeout(t); +} + +void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { + checkAttached(); + getState()->receiver.setCommandPoint(SessionPoint(id, offset)); + if (!receiveReady) { + receiveReady = true; + readyToReceive(); + } +} + +void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { + checkAttached(); + if (commands.empty() && getState()->hasState()) + throw IllegalStateException( + QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); + getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front()); + if (!sendReady) // send command point if not already sent + sendCommandPoint(); +} + +void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { + checkAttached(); + // Ignore non-contiguous confirmations. + if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) { + getState()->sender.confirmed(commands.rangesBegin()->last()); + } +} + +void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) { + checkAttached(); + getState()->sender.completed(commands); + if (!commands.empty()) + peer.knownCompleted(commands); // Always send a timely reply +} + +void SessionHandler::knownCompleted(const SequenceSet& commands) { + checkAttached(); + getState()->receiver.knownCompleted(commands); +} + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { + checkAttached(); + if (expected) { + SequenceSet expectSet; + if (getState()->hasState()) + expectSet.add(getState()->receiver.getExpected().command); + peer.expected(expectSet, Array()); + } + if (confirmed) { + SequenceSet confirmSet; + if (!getState()->receiver.getUnknownComplete().empty()) + confirmSet.add(getState()->receiver.getUnknownComplete().front(), + getState()->receiver.getReceived().command); + peer.confirmed(confirmSet, Array()); + } + if (completed) + peer.completed(getState()->receiver.getUnknownComplete(), true); +} + +void SessionHandler::gap(const SequenceSet& /*commands*/) { + throw NotImplementedException("session.gap not supported"); +} + +void SessionHandler::sendDetach() +{ + checkAttached(); + ignoring = true; + peer.detach(getState()->getId().getName()); +} + +void SessionHandler::sendCompletion() { + checkAttached(); + peer.completed(getState()->receiver.getUnknownComplete(), true); +} + +void SessionHandler::sendAttach(bool force) { + checkAttached(); + peer.attach(getState()->getId().getName(), force); + if (getState()->hasState()) + peer.flush(true, true, true); + else + sendCommandPoint(); +} + +void SessionHandler::sendCommandPoint() { + SessionPoint point(getState()->sender.getCommandPoint()); + peer.commandPoint(point.command, point.offset); + if (!sendReady) { + sendReady = true; + readyToSend(); + } +} + +void SessionHandler::sendTimeout(uint32_t t) { + checkAttached(); + peer.requestTimeout(t); +} + +void SessionHandler::sendFlush() { + peer.flush(false, true, true); +} + +bool SessionHandler::ready() const { + return sendReady && receiveReady; +} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h new file mode 100644 index 0000000000..85577ebafc --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -0,0 +1,116 @@ +#ifndef QPID_AMQP_0_10_SESSIONHANDLER_H +#define QPID_AMQP_0_10_SESSIONHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/AMQP_AllProxy.h" +#include "qpid/framing/AMQP_AllOperations.h" + +namespace qpid { + +class SessionState; + +namespace amqp_0_10 { + +/** + * Base SessionHandler with logic common to both client and broker. + * + * A SessionHandler is associated with a channel and can be attached + * to a session state. + */ + +class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, + public framing::FrameHandler::InOutHandler +{ + public: + typedef framing::AMQP_AllProxy::Session Peer; + + SessionHandler(); + SessionHandler(framing::FrameHandler& out, uint16_t channel); + ~SessionHandler(); + + void setChannel(uint16_t ch) { channel = ch; } + uint16_t getChannel() const { return channel.get(); } + + void setOutHandler(framing::FrameHandler& h) { channel.next = &h; } + + virtual SessionState* getState() = 0; + virtual framing::FrameHandler* getInHandler() = 0; + + // Non-protocol methods, called locally to initiate some action. + void sendDetach(); + void sendCompletion(); + void sendAttach(bool force); + void sendTimeout(uint32_t t); + void sendFlush(); + void sendCommandPoint(); + + /** True if the handler is ready to send and receive */ + bool ready() const; + + // Protocol methods + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t code); + + void requestTimeout(uint32_t t); + void timeout(uint32_t t); + + void commandPoint(const framing::SequenceNumber& id, uint64_t offset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + protected: + virtual void invoke(const framing::AMQMethodBody& m); + + virtual void setState(const std::string& sessionName, bool force) = 0; + virtual void channelException(uint16_t code, const std::string& msg) = 0; + virtual void connectionException(uint16_t code, const std::string& msg) = 0; + + + // Notification of events + virtual void readyToSend() {} + virtual void readyToReceive() {} + virtual void handleDetach(); + + virtual void handleIn(framing::AMQFrame&); + virtual void handleOut(framing::AMQFrame&); + + void checkAttached(); + void checkName(const std::string& name); + + framing::ChannelHandler channel; + Peer peer; + bool ignoring; + bool sendReady, receiveReady; + // FIXME aconway 2008-05-07: move handler-related functions from SessionState. + +}; +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_SESSIONHANDLER_H*/ diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index fbf3c0b7ca..b93869be85 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -82,14 +82,14 @@ struct Handler { template <class X, void (X::*F)(T)> class MemFunRef : public Handler<T> { public: - MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(x) {} - void handle(T t) { (target.*F)(t); } + MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} + void handle(T t) { (target->*F)(t); } /** Allow calling with -> syntax, compatible with Chains */ MemFunRef* operator->() { return this; } private: - X& target; + X* target; }; /** Interface for a handler that implements a diff --git a/cpp/src/qpid/framing/Proxy.cpp b/cpp/src/qpid/framing/Proxy.cpp index b47060028f..6b37fb368d 100644 --- a/cpp/src/qpid/framing/Proxy.cpp +++ b/cpp/src/qpid/framing/Proxy.cpp @@ -22,16 +22,21 @@ namespace qpid { namespace framing { +Proxy::Proxy(FrameHandler& h) : out(&h) {} + Proxy::~Proxy() {} void Proxy::send(const AMQBody& b) { AMQFrame f(b); - out.handle(f); + out->handle(f); } - ProtocolVersion Proxy::getVersion() const { return ProtocolVersion(); } +FrameHandler& Proxy::getHandler() { return *out; } + +void Proxy::setHandler(FrameHandler& f) { out=&f; } + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h index 86b99a83b0..3dc082097a 100644 --- a/cpp/src/qpid/framing/Proxy.h +++ b/cpp/src/qpid/framing/Proxy.h @@ -33,16 +33,18 @@ class AMQBody; class Proxy { public: - Proxy(FrameHandler& h) : out(h) {} + Proxy(FrameHandler& h); virtual ~Proxy(); void send(const AMQBody&); ProtocolVersion getVersion() const; - FrameHandler& getHandler() { return out; } - protected: - FrameHandler& out; + FrameHandler& getHandler(); + void setHandler(FrameHandler&); + + private: + FrameHandler* out; }; }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp index cdf890b7f8..9ba55b2fa8 100644 --- a/cpp/src/qpid/framing/SequenceSet.cpp +++ b/cpp/src/qpid/framing/SequenceSet.cpp @@ -84,7 +84,7 @@ void SequenceSet::remove(const SequenceNumber& s) { *this -= s; } struct RangePrinter { std::ostream& out; RangePrinter(std::ostream& o) : out(o) {} - void operator()(SequenceNumber i, SequenceNumber j) { + void operator()(SequenceNumber i, SequenceNumber j) const { out << "[" << i.getValue() << "," << j.getValue() << "] "; } }; diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h index 029a26818e..99e7cb4b21 100644 --- a/cpp/src/qpid/framing/SequenceSet.h +++ b/cpp/src/qpid/framing/SequenceSet.h @@ -34,6 +34,8 @@ class SequenceSet : public RangeSet<SequenceNumber> { explicit SequenceSet(const RangeSet<SequenceNumber>& r) : RangeSet<SequenceNumber>(r) {} explicit SequenceSet(const SequenceNumber& s) { add(s); } + SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); } + void encode(Buffer& buffer) const; void decode(Buffer& buffer); @@ -41,17 +43,20 @@ class SequenceSet : public RangeSet<SequenceNumber> { bool contains(const SequenceNumber& s) const; void add(const SequenceNumber& s); - void add(const SequenceNumber& start, const SequenceNumber& end); + void add(const SequenceNumber& start, const SequenceNumber& finish); // Closed range void add(const SequenceSet& set); void remove(const SequenceNumber& s); - void remove(const SequenceNumber& start, const SequenceNumber& end); + void remove(const SequenceNumber& start, const SequenceNumber& finish); // Closed range void remove(const SequenceSet& set); - template <class T> T for_each(T& t) const { - for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) { + template <class T> void for_each(T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) t(i->first(), i->last()); } - return t; + + template <class T> void for_each(const T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) + t(i->first(), i->last()); } friend std::ostream& operator<<(std::ostream&, const SequenceSet&); diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h index 4b3f704dda..1df62b3138 100644 --- a/cpp/src/qpid/framing/SessionState.h +++ b/cpp/src/qpid/framing/SessionState.h @@ -70,7 +70,7 @@ class SessionState SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } - State getState() const { return state; } + State getState() { return state; } /** Received incoming L3 frame. * @return SequenceNumber if an ack should be sent, empty otherwise. |