diff options
author | Alan Conway <aconway@apache.org> | 2007-09-28 16:21:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-09-28 16:21:34 +0000 |
commit | 8b82aef0397d65de0c7278476e4f409fcc636306 (patch) | |
tree | a25d9bbb01203335bc1450a5e5ed0c29074913ae /cpp/src | |
parent | f689c47486b4cfc7655e37da2b232fe27be1cc42 (diff) | |
download | qpid-python-8b82aef0397d65de0c7278476e4f409fcc636306.tar.gz |
* src/tests/ClientSessionTest.cpp: Suspend/resume tests.
* broker/SessionManager.cpp, broker/SessionHandler.cpp:
Implement suspend/resume
* client/ScopedAssociation.h, SessionCore.h, SessionHandler.h:
Simplified relationships.
- Removed ScopedAssociation.
- SessionHandler: is now a member of SessionCore.
- SessionCore: shared_ptr ownership by Session(s) and ConnectionImpl.
- Using framing::FrameHandler interfaces.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580403 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/Exception.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/ScopedAssociation.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 54 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionHandler.cpp | 121 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionHandler.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/log/Statement.h | 11 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 24 | ||||
-rw-r--r-- | cpp/src/tests/InProcessBroker.h | 4 |
21 files changed, 336 insertions, 258 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 06e857bc4f..aab13d81b4 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -310,7 +310,6 @@ nobase_include_HEADERS = \ qpid/client/MessageListener.h \ qpid/client/MessageQueue.h \ qpid/client/Response.h \ - qpid/client/ScopedAssociation.h \ qpid/client/SessionCore.h \ qpid/client/SessionHandler.h \ qpid/client/StateManager.h \ diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index 7e34e49bef..11051d1a2e 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -32,7 +32,7 @@ std::string strError(int err) { } static void ctorLog(const std::exception* e) { - QPID_LOG(trace, "Exception constructor " << typeid(e).name() << ": " << e->what()); + QPID_LOG(trace, "Exception: " << e->what()); } Exception::Exception() throw() { ctorLog(this); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 11f5545144..776634e04e 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -78,7 +78,6 @@ class Connection : public sys::ConnectionInputHandler, void idleIn(); void closed(); - // FIXME aconway 2007-08-30: When does closeChannel close the session? void closeChannel(framing::ChannelId channel); private: diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index ecbffed465..d7308572f9 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -35,7 +35,7 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch), proxy(out), ignoring(false) {} -SessionHandler::~SessionHandler() { } +SessionHandler::~SessionHandler() {} namespace { ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } @@ -78,18 +78,15 @@ void SessionHandler::handleOut(AMQFrame& f) { void SessionHandler::assertOpen(const char* method) { if (!session.get()) throw ChannelErrorException( - QPID_MSG(""<<method<<" failed: No session for channel " + QPID_MSG(method << " failed: No session for channel " << getChannel())); } void SessionHandler::assertClosed(const char* method) { - // FIXME aconway 2007-08-31: Should raise channel-busy, need - // to update spec. if (session.get()) - throw PreconditionFailedException( - QPID_MSG(""<<method<<" failed: " - << channel << " already open on channel " - << getChannel())); + throw ChannelBusyException( + QPID_MSG(method << " failed: channel " << channel + << " is already open.")); } void SessionHandler::open(uint32_t detachedLifetime) { @@ -100,6 +97,12 @@ void SessionHandler::open(uint32_t detachedLifetime) { getProxy().getSession().attached(session->getId(), session->getTimeout()); } +void SessionHandler::resume(const Uuid& id) { + assertClosed("resume"); + session = connection.broker.getSessionManager().resume(*this, id); + getProxy().getSession().attached(session->getId(), session->getTimeout()); +} + void SessionHandler::flow(bool /*active*/) { // FIXME aconway 2007-09-19: Removed in 0-10, remove assert(0); throw NotImplementedException(); @@ -115,26 +118,23 @@ void SessionHandler::close() { ignoring=false; session.reset(); getProxy().getSession().closed(REPLY_SUCCESS, "ok"); - // No need to remove from connection map, will be re-used - // if channel is re-opened. + assert(&connection.getChannel(channel) == this); + connection.closeChannel(channel); } void SessionHandler::closed(uint16_t replyCode, const string& replyText) { - // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids - // to text names. QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); ignoring=false; session.reset(); - // No need to remove from connection map, will be re-used - // if channel is re-opened. -} - -void SessionHandler::resume(const Uuid& /*sessionId*/) { - assert(0); throw NotImplementedException(); } void SessionHandler::suspend() { - assert(0); throw NotImplementedException(); + assertOpen("suspend"); + connection.broker.getSessionManager().suspend(session); + assert(!session.get()); + getProxy().getSession().detached(); + assert(&connection.getChannel(channel) == this); + connection.closeChannel(channel); } void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 20dd29bc31..e422e50657 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -22,19 +22,22 @@ #include "SessionManager.h" #include "SessionState.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" #include "qpid/memory.h" #include <boost/bind.hpp> +#include <boost/range.hpp> #include <algorithm> #include <functional> +#include <ostream> namespace qpid { namespace broker { using namespace sys; using namespace framing; -using std::make_pair; SessionManager::SessionManager() {} @@ -51,12 +54,16 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); - session->expiry = AbsTime(now(),session->getTimeout()); + active.erase(session->getId()); + session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + session->handler = 0; suspended.push_back(session.release()); // In expiry order eraseExpired(); } -std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) { +std::auto_ptr<SessionState> SessionManager::resume( + SessionHandler& sh, const Uuid& id) +{ Mutex::ScopedLock l(lock); eraseExpired(); if (active.find(id) != active.end()) @@ -70,15 +77,20 @@ std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) { throw InvalidArgumentException( QPID_MSG("No suspended session with id=" << id)); active.insert(id); - return make_auto_ptr(suspended.release(i).release()); + std::auto_ptr<SessionState> state(suspended.release(i).release()); + state->handler = &sh; + return state; } void SessionManager::eraseExpired() { // Called with lock held. - Suspended::iterator i = std::lower_bound( - suspended.begin(), suspended.end(), now(), - boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - suspended.erase(suspended.begin(), i); + if (!suspended.empty()) { + Suspended::iterator keep = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 8225f04798..bbe969de6c 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -47,8 +47,7 @@ class SessionManager : private boost::noncopyable { SessionManager(); ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open( - SessionHandler& h, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. @@ -58,7 +57,7 @@ class SessionManager : private boost::noncopyable { /** Resume a suspended session. *@throw Exception if timed out or non-existant. */ - std::auto_ptr<SessionState> resume(const framing::Uuid& id); + std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&); private: typedef boost::ptr_vector<SessionState> Suspended; @@ -69,7 +68,7 @@ class SessionManager : private boost::noncopyable { Active active; void eraseExpired(); - friend class SessionState; // removes deleted sessions from active set. + friend class SessionState; // removes deleted sessions from active set. }; diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 58944c5968..d152937692 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -32,6 +32,7 @@ #include <set> #include <vector> +#include <ostream> namespace qpid { @@ -79,7 +80,7 @@ class SessionState : public framing::FrameHandler::Chains, uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } - + private: /** Only SessionManager can open sessions */ SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_); @@ -96,6 +97,11 @@ class SessionState : public framing::FrameHandler::Chains, friend class SessionManager; }; + +inline std::ostream& operator<<(std::ostream& out, const SessionState& session) { + return out << session.getId(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index cef076527f..2d8cbb2ddb 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -25,7 +25,7 @@ #include "Connection.h" #include "Channel.h" #include "Message.h" -#include "ScopedAssociation.h" +#include "SessionCore.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -70,16 +70,22 @@ void Connection::openChannel(Channel& channel) { channel.open(newSession()); } -Session Connection::newSession() { - ChannelId id = ++channelIdCounter; - SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); - session->open(); - return Session(assoc); +Session Connection::newSession(uint32_t detachedLifetime) { + shared_ptr<SessionCore> core( + new SessionCore(*impl, ++channelIdCounter, max_frame_size)); + impl->addSession(core); + core->open(detachedLifetime); + return Session(core); } -void Connection::close() -{ +void Connection::resume(Session& session) { + shared_ptr<SessionCore> core=session.impl; + core->setChannel(++channelIdCounter); + impl->addSession(core); + core->resume(*impl); +} + +void Connection::close() { impl->close(); } diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index f5d6a387a9..4a9a68e8b3 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -28,7 +28,7 @@ #include "ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" - +#include "qpid/framing/Uuid.h" namespace qpid { @@ -122,7 +122,25 @@ class Connection */ void openChannel(Channel&); - Session newSession(); + /** + * Create a new session on this connection. Sessions allow + * multiple streams of work to be multiplexed over the same + * connection. + * + *@param detachedLifetime: A session may be detached from its + * channel, either by calling Session::suspend() or because of a + * network failure. The session state is perserved for + * detachedLifetime seconds to allow a call to resume(). After + * that the broker may discard the session state. Default is 0, + * meaning the session cannot be resumed. + */ + Session newSession(uint32_t detachedLifetime=0); + + /** + * Resume a suspendded session. A session may be resumed + * on a different connection to the one that created it. + */ + void resume(Session& session); }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 8ab60cff50..43576d2273 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,11 @@ * under the License. * */ +#include "qpid/framing/reply_exceptions.h" + #include "ConnectionImpl.h" +#include "SessionCore.h" + #include <boost/bind.hpp> #include <boost/format.hpp> @@ -26,7 +30,8 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false) +ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) + : connector(c), isClosed(false) { handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, connector, _1); @@ -37,22 +42,13 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), i connector->setShutdownHandler(this); } -void ConnectionImpl::allocated(SessionCore::shared_ptr session) -{ - Mutex::ScopedLock l(lock); - if (sessions.find(session->getId()) != sessions.end()) { - throw Exception("Id already in use."); - } - sessions[session->getId()] = session; -} - -void ConnectionImpl::released(SessionCore::shared_ptr session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); - SessionMap::iterator i = sessions.find(session->getId()); - if (i != sessions.end()) { - sessions.erase(i); - } + boost::shared_ptr<SessionCore>& s = sessions[session->getChannel()]; + if (s) + throw ChannelBusyException(); + s = session; } void ConnectionImpl::handle(framing::AMQFrame& frame) @@ -62,7 +58,14 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { - find(frame.getChannel())->handle(frame); + boost::shared_ptr<SessionCore> s; + { + Mutex::ScopedLock l(lock); + s = sessions[frame.getChannel()]; + } + if (!s) + throw ChannelErrorException(); + s->in(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -117,23 +120,12 @@ void ConnectionImpl::signalClose(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - Mutex::ScopedUnlock u(lock); i->second->closed(code, text); } sessions.clear(); isClosed = true; } -SessionCore::shared_ptr ConnectionImpl::find(uint16_t id) -{ - Mutex::ScopedLock l(lock); - SessionMap::iterator i = sessions.find(id); - if (i == sessions.end()) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - return i->second; -} - void ConnectionImpl::assertNotClosed() { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index fc786ba643..975beaa101 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -30,17 +30,18 @@ #include "qpid/sys/TimeoutHandler.h" #include "ConnectionHandler.h" #include "Connector.h" -#include "SessionCore.h" namespace qpid { namespace client { +class SessionCore; + class ConnectionImpl : public framing::FrameHandler, - public sys::TimeoutHandler, - public sys::ShutdownHandler + public sys::TimeoutHandler, + public sys::ShutdownHandler { - typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap; + typedef std::map<uint16_t, boost::shared_ptr<SessionCore> > SessionMap; SessionMap sessions; ConnectionHandler handler; boost::shared_ptr<Connector> connector; @@ -56,14 +57,12 @@ class ConnectionImpl : public framing::FrameHandler, void shutdown(); void signalClose(uint16_t, const std::string&); void assertNotClosed(); - SessionCore::shared_ptr find(uint16_t); - public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); - void allocated(SessionCore::shared_ptr); - void released(SessionCore::shared_ptr); + void addSession(const boost::shared_ptr<SessionCore>&); + void open(const std::string& host, int port = 5672, const std::string& uid = "guest", const std::string& pwd = "guest", diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 4e0ee05da2..7e4926bc25 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -170,7 +170,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: if(l) { completion.listenForResult(id, l); } - AMQFrame frame(0/*channel will be filled in be channel handler*/, command); + AMQFrame frame(0/*channel will be filled in by channel handler*/, command); if (hasContent) { frame.setEof(false); } diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index a3a3cde390..427c39b61f 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -38,7 +38,7 @@ namespace client { class ExecutionHandler : private framing::AMQP_ServerOperations::ExecutionHandler, - public ChainableFrameHandler, + public framing::FrameHandler, public Execution { framing::SequenceNumber incomingCounter; @@ -66,9 +66,14 @@ class ExecutionHandler : public: typedef CompletionTracker::ResultListener ResultListener; + // Allow other classes to set the out handler. + framing::FrameHandler::Chain out; + ExecutionHandler(uint64_t maxFrameSize = 65536); + // Incoming handler. void handle(framing::AMQFrame& frame); + framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, ResultListener=ResultListener()); diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h deleted file mode 100644 index 861a28c0f8..0000000000 --- a/cpp/src/qpid/client/ScopedAssociation.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ScopedAssociation_ -#define _ScopedAssociation_ - -#include "ConnectionImpl.h" -#include "SessionCore.h" - -namespace qpid { -namespace client { - -struct ScopedAssociation -{ - typedef boost::shared_ptr<ScopedAssociation> shared_ptr; - - SessionCore::shared_ptr session; - ConnectionImpl::shared_ptr connection; - - ScopedAssociation() {} - - ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c) - { - connection->allocated(session); - } - - ~ScopedAssociation() - { - if (connection && session) connection->released(session); - } -}; - - -}} - -#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index f093e12594..3f8f9244ef 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -20,27 +20,25 @@ */ #include "SessionCore.h" -#include <boost/bind.hpp> +#include "qpid/framing/constants.h" #include "Future.h" #include "FutureResponse.h" #include "FutureResult.h" +#include <boost/bind.hpp> + using namespace qpid::client; using namespace qpid::framing; -SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out, - uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false) +SessionCore::SessionCore(FrameHandler& out_, uint16_t ch, uint64_t maxFrameSize) + : channel(ch), l2(*this), l3(maxFrameSize), uuid(false), sync(false) { - l2.out = boost::bind(&FrameHandler::handle, out, _1); - l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1); - l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1); - l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2); + l2.next = &l3; + l3.out = &out; + out.next = &out_; } -void SessionCore::open() -{ - l2.open(id); -} +SessionCore::~SessionCore() {} ExecutionHandler& SessionCore::getExecution() { @@ -50,6 +48,7 @@ ExecutionHandler& SessionCore::getExecution() FrameSet::shared_ptr SessionCore::get() { + checkClosed(); return l3.getDemux().getDefault().pop(); } @@ -63,38 +62,55 @@ bool SessionCore::isSync() return sync; } -void SessionCore::close() -{ - l2.close(); - stop(); +namespace { +struct ClosedOnExit { + SessionCore& core; + int code; + std::string text; + ClosedOnExit(SessionCore& s, int c, const std::string& t) + : core(s), code(c), text(t) {} + ~ClosedOnExit() { core.closed(code, text); } +}; } -void SessionCore::stop() +void SessionCore::close() { - l3.getDemux().close(); - l3.getCompletionTracker().close(); + checkClosed(); + ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user."); + l2.close(); } -void SessionCore::handle(AMQFrame& frame) -{ - l2.incoming(frame); +void SessionCore::suspend() { + checkClosed(); + ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended"); + l2.suspend(); } void SessionCore::closed(uint16_t code, const std::string& text) { - stop(); - - isClosed = true; + out.next = 0; reason.code = code; reason.text = text; + l2.closed(); + l3.getDemux().close(); + l3.getCompletionTracker().close(); } -void SessionCore::checkClosed() +void SessionCore::checkClosed() const { - if (isClosed) { - //TODO: could actually have been a connection exception + // TODO: could have been a connection exception + if(out.next == 0) throw ChannelException(reason.code, reason.text); - } +} + +void SessionCore::open(uint32_t detachedLifetime) { + assert(out.next); + l2.open(detachedLifetime); +} + +void SessionCore::resume(FrameHandler& out_) { + out.next = &out_; + l2.resume(); } Future SessionCore::send(const AMQBody& command) @@ -131,3 +147,15 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content) //send method impl: return Future(l3.send(command, content)); } + +void SessionCore::handleIn(AMQFrame& frame) { + l2.handle(frame); +} + +void SessionCore::handleOut(AMQFrame& frame) +{ + checkClosed(); + frame.setChannel(channel); + out.next->handle(frame); +} + diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 5b15a607b3..b717914206 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -28,6 +28,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" +#include "qpid/framing/Uuid.h" #include "SessionHandler.h" #include "ExecutionHandler.h" @@ -36,7 +37,12 @@ namespace client { class Future; -class SessionCore : public framing::FrameHandler +/** + * Session implementation, sets up handler chains. + * Attaches to a SessionHandler when active, detaches + * when closed. + */ +class SessionCore : public framing::FrameHandler::InOutHandler { struct Reason { @@ -44,33 +50,49 @@ class SessionCore : public framing::FrameHandler std::string text; }; - ExecutionHandler l3; + uint16_t channel; SessionHandler l2; - const uint16_t id; + ExecutionHandler l3; + framing::Uuid uuid; bool sync; - bool isClosed; Reason reason; + + protected: + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + public: + typedef shared_ptr<SessionCore> shared_ptr; -public: - typedef boost::shared_ptr<SessionCore> shared_ptr; + SessionCore(framing::FrameHandler& out, uint16_t channel, uint64_t maxFrameSize); + ~SessionCore(); - SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize); framing::FrameSet::shared_ptr get(); - uint16_t getId() const { return id; } - void setSync(bool); - bool isSync(); - void open(); + + framing::Uuid getId() const { return uuid; } + void setId(const framing::Uuid& id) { uuid= id; } + + uint16_t getChannel() const { assert(channel); return channel; } + void setChannel(uint16_t ch) { assert(ch); channel=ch; } + + void open(uint32_t detachedLifetime); + + /** Closed by client code */ void close(); - void stop(); + + /** Closed by peer */ void closed(uint16_t code, const std::string& text); - void checkClosed(); + + void resume(framing::FrameHandler& out); + void suspend(); + + void setSync(bool); + bool isSync(); ExecutionHandler& getExecution(); + void checkClosed() const; Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); - - //for incoming frames: - void handle(framing::AMQFrame& frame); }; } diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp index 93e628ab34..d3b04e5356 100644 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ b/cpp/src/qpid/client/SessionHandler.cpp @@ -22,31 +22,44 @@ #include "SessionHandler.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/client/SessionCore.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {} +namespace { +// TODO aconway 2007-09-28: hack till we have multi-version support. +ProtocolVersion version; +} + +SessionHandler::SessionHandler(SessionCore& parent) + : StateManager(CLOSED), core(parent) {} + +SessionHandler::~SessionHandler() {} -void SessionHandler::incoming(AMQFrame& frame) +void SessionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (getState() == OPEN) { - SessionClosedBody* closeBody= + core.checkClosed(); + SessionClosedBody* closedBody= dynamic_cast<SessionClosedBody*>(body->getMethod()); - if (closeBody) { - setState(CLOSED_BY_PEER); - code = closeBody->getReplyCode(); - text = closeBody->getReplyText(); - if (onClose) { - onClose(closeBody->getReplyCode(), closeBody->getReplyText()); - } + if (closedBody) { + closed(); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } else { try { - in(frame); - }catch(ChannelException& e){ - closed(e.code, e.toString()); + next->handle(frame); + } + catch(const ChannelException& e){ + QPID_LOG(error, "Channel exception:" << e.what()); + closed(); + AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); + core.out(f); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } } } else { @@ -57,69 +70,63 @@ void SessionHandler::incoming(AMQFrame& frame) } } -void SessionHandler::outgoing(AMQFrame& frame) -{ - if (getState() == OPEN) { - frame.setChannel(id); - out(frame); - } else if (getState() == CLOSED) { - throw Exception(QPID_MSG("Channel not open, can't send " << frame)); - } else if (getState() == CLOSED_BY_PEER) { - throw ChannelException(code, text); - } -} - -void SessionHandler::open(uint16_t _id) +void SessionHandler::attach(const AMQMethodBody& command) { - id = _id; - setState(OPENING); - // FIXME aconway 2007-09-19: Need to get this from API. - AMQFrame f(id, SessionOpenBody(version, 0)); - out(f); - + AMQFrame f(0, command); + core.out(f); std::set<int> states; states.insert(OPEN); - states.insert(CLOSED_BY_PEER); + states.insert(CLOSED); waitFor(states); - if (getState() != OPEN) { - throw Exception("Failed to open channel."); - } + if (getState() != OPEN) + throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel())); +} + +void SessionHandler::open(uint32_t detachedLifetime) { + attach(SessionOpenBody(version, detachedLifetime)); } -void SessionHandler::close() +void SessionHandler::resume() { + attach(SessionResumeBody(version, core.getId())); +} + +void SessionHandler::detach(const AMQMethodBody& command) { setState(CLOSING); - AMQFrame f(id, SessionCloseBody(version)); - out(f); + AMQFrame f(0, command); + core.out(f); waitFor(CLOSED); } -void SessionHandler::closed(uint16_t code, const std::string& msg) -{ - setState(CLOSED); - AMQFrame f(id, SessionClosedBody(version, code, msg)); - out(f); -} +void SessionHandler::close() { detach(SessionCloseBody(version)); } +void SessionHandler::suspend() { detach(SessionSuspendBody(version)); } +void SessionHandler::closed() { setState(CLOSED); } void SessionHandler::handleMethod(AMQMethodBody* method) { switch (getState()) { - case OPENING: - if (method->isA<SessionAttachedBody>()) { - setState(OPEN); - } else { - throw ConnectionException(504, "Channel not opened."); - } - break; + case OPENING: { + SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method); + if (attached) { + core.setId(attached->getSessionId()); + setState(OPEN); + } else + throw ChannelErrorException(); + break; + } case CLOSING: - if (method->isA<SessionClosedBody>()) { - setState(CLOSED); - } //else just ignore it + if (method->isA<SessionClosedBody>() || + method->isA<SessionDetachedBody>()) + closed(); break; + case CLOSED: - throw ConnectionException(504, "Channel is closed."); + throw ChannelErrorException(); + default: - throw Exception("Unexpected state encountered in SessionHandler!"); + assert(0); + throw InternalErrorException(QPID_MSG("Internal Error.")); } } + diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h index e71d527406..994b8402de 100644 --- a/cpp/src/qpid/client/SessionHandler.h +++ b/cpp/src/qpid/client/SessionHandler.h @@ -22,36 +22,40 @@ #define _SessionHandler_ #include "StateManager.h" -#include "ChainableFrameHandler.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/Uuid.h" +#include "qpid/shared_ptr.h" namespace qpid { namespace client { +class SessionCore; -class SessionHandler : private StateManager, public ChainableFrameHandler +/** + * Handles incoming session (L2) commands. + */ +class SessionHandler : public framing::FrameHandler, + private StateManager { - enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER}; - framing::ProtocolVersion version; - uint16_t id; + enum STATES {OPENING, OPEN, CLOSING, CLOSED}; + SessionCore& core; - uint16_t code; - std::string text; - void handleMethod(framing::AMQMethodBody* method); - void closed(uint16_t code, const std::string& msg); - -public: - typedef boost::function<void(uint16_t, const std::string&)> CloseListener; - - SessionHandler(); + void attach(const framing::AMQMethodBody&); + void detach(const framing::AMQMethodBody&); + + public: + SessionHandler(SessionCore& parent); + ~SessionHandler(); - void incoming(framing::AMQFrame& frame); - void outgoing(framing::AMQFrame& frame); + /** Incoming from broker */ + void handle(framing::AMQFrame&); - void open(uint16_t id); + void open(uint32_t detachedLifetime); + void resume(); void close(); - - CloseListener onClose; + void closed(); + void suspend(); }; }} diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h index 563da3716c..4eb4d1e7d8 100644 --- a/cpp/src/qpid/log/Statement.h +++ b/cpp/src/qpid/log/Statement.h @@ -113,7 +113,16 @@ inline std::ostream& noop(std::ostream& s) { return s; } stmt_.log(QPID_LOG_STRINGSTREAM(message)); \ } while(0) - +/** + * Macro for complicated logging logic that can't fit in a simple QPID_LOG + * statement. For example: + * @code + * QPID_IF_LOG(debug) { + * message = do_complicated_stuff; + * QPID_LOG(debug, message); + * } + */ +#define QPID_IF_LOG(level) }} // namespace qpid::log diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 1d59fbed33..2495a06fa4 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -60,6 +60,8 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_TEST(testQueueQuery); CPPUNIT_TEST(testTransfer); CPPUNIT_TEST(testDispatcher); + CPPUNIT_TEST(testSuspendResume); + CPPUNIT_TEST(testSuspendResumeErrors); CPPUNIT_TEST_SUITE_END(); boost::shared_ptr<Connector> broker; @@ -139,6 +141,28 @@ public: } void testSuspendResume() { + session = connection.newSession(60); + session.suspend(); + try { + session.exchangeQuery_(name="amq.fanout"); + CPPUNIT_FAIL("Expected session suspended exception"); + } catch(...) {} + connection.resume(session); + session.exchangeQuery_(name="amq.fanout"); + // FIXME aconway 2007-09-25: build up session state and confirm + //it survives the resume + } + + void testSuspendResumeErrors() { + session.suspend(); // session has 0 timeout. + try { + session.exchangeQuery_(name="amq.fanout"); + CPPUNIT_FAIL("Expected suspended session exception"); + } catch(...) {} + try { + connection.resume(session); + CPPUNIT_FAIL("Expected no such session exception."); + } catch(...) {} } }; diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 531ebd8fa7..2a9f12771b 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -24,6 +24,7 @@ #include "qpid/broker/Connection.h" #include "qpid/client/Connector.h" #include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" #include <vector> #include <iostream> @@ -101,7 +102,8 @@ class InProcessBroker : public client::Connector { ) : sender(sender_), conversation(conversation_), in(ih) {} void send(framing::AMQFrame& frame) { - //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl; + QPID_LOG(debug, + (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame); conversation.push_back(TaggedFrame(sender, frame)); in->received(frame); } |