diff options
author | Gordon Sim <gsim@apache.org> | 2008-02-25 16:56:29 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-02-25 16:56:29 +0000 |
commit | 773cc35a38cd34095f8800259ee7a2165a817053 (patch) | |
tree | 24c6b26d43ae2f95a7ad7be695bbe57ddbc4b641 /cpp | |
parent | f4e2d3de5df7bbaae41053191638734a991fd21c (diff) | |
download | qpid-python-773cc35a38cd34095f8800259ee7a2165a817053.tar.gz |
Some refactoring of the 0-10 codepath (being migrated to final spec) that primarily colocates the current session and execution layers to facilitate implementing the new session layer that will now encompass this behaviour.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@630934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
28 files changed, 786 insertions, 118 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index b44aaa7e5e..df0b867b00 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -166,6 +166,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PreviewConnection.cpp \ qpid/broker/PreviewConnectionHandler.cpp \ qpid/broker/PreviewSessionHandler.cpp \ + qpid/broker/PreviewSessionManager.cpp \ + qpid/broker/PreviewSessionState.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -270,6 +272,8 @@ nobase_include_HEADERS = \ qpid/broker/PreviewConnection.h \ qpid/broker/PreviewConnectionHandler.h \ qpid/broker/PreviewSessionHandler.h \ + qpid/broker/PreviewSessionManager.h \ + qpid/broker/PreviewSessionState.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0a0eb0a0df..9bfa868d9c 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) : store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), - sessionManager(conf.ack) + sessionManager(conf.ack), + previewSessionManager(conf.ack) { // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 55bc7644a5..153eabc6b3 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,6 +30,7 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" +#include "PreviewSessionManager.h" #include "Vhost.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } SessionManager& getSessionManager() { return sessionManager; } + PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; + PreviewSessionManager previewSessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index c8c1e12f28..ef2c51bb8d 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return session.getVersion();} + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} AccessHandler* getAccessHandler() { diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 410d400c9d..4c51e2a826 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -20,7 +20,7 @@ */ #include "SemanticState.h" -#include "SessionState.h" +#include "SessionContext.h" #include "ConnectionState.h" namespace qpid { @@ -35,13 +35,13 @@ class Broker; class HandlerImpl { protected: SemanticState& state; - SessionState& session; + SessionContext& session; HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } ConnectionState& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp index 19e6a235c4..36092bb7f6 100644 --- a/cpp/src/qpid/broker/PreviewSessionHandler.cpp +++ b/cpp/src/qpid/broker/PreviewSessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "PreviewSessionHandler.h" -#include "SessionState.h" +#include "PreviewSessionState.h" #include "PreviewConnection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -36,7 +36,7 @@ using namespace std; using namespace qpid::sys; PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const { void PreviewSessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); peerSession.attached(session->getId(), session->getTimeout()); } void PreviewSessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); + session = connection.broker.getPreviewSessionManager().resume(id); session->attach(*this); SequenceNumber seq = session->resuming(); peerSession.attached(session->getId(), session->getTimeout()); @@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) void PreviewSessionHandler::localSuspend() { if (session.get() && session->isAttached()) { session->detach(); - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } } @@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { + if (session->getState() == PreviewSessionState::RESUMING) { session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), @@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() { void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); } void PreviewSessionHandler::detached() { - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h index e1096ebf9f..4c517367d7 100644 --- a/cpp/src/qpid/broker/PreviewSessionHandler.h +++ b/cpp/src/qpid/broker/PreviewSessionHandler.h @@ -36,7 +36,7 @@ namespace qpid { namespace broker { class PreviewConnection; -class SessionState; +class PreviewSessionState; /** * A SessionHandler is associated with each active channel. It @@ -45,7 +45,7 @@ class SessionState; */ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand ~PreviewSessionHandler(); /** Returns 0 if not attached to a session */ - SessionState* getSession() { return session.get(); } - const SessionState* getSession() const { return session.get(); } + PreviewSessionState* getSession() { return session.get(); } + const PreviewSessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel.get(); } @@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; - std::auto_ptr<SessionState> session; + std::auto_ptr<PreviewSessionState> session; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionManager.cpp b/cpp/src/qpid/broker/PreviewSessionManager.cpp new file mode 100644 index 0000000000..ec73082817 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionManager.cpp @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewSessionManager.h" +#include "PreviewSessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" +#include "qpid/memory.h" + +#include <boost/bind.hpp> +#include <boost/range.hpp> + +#include <algorithm> +#include <functional> +#include <ostream> + +namespace qpid { +namespace broker { + +using namespace sys; +using namespace framing; + +PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {} + +PreviewSessionManager::~PreviewSessionManager() {} + +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. +std::auto_ptr<PreviewSessionState> PreviewSessionManager::open( + PreviewSessionHandler& h, uint32_t timeout_) +{ + Mutex::ScopedLock l(lock); + std::auto_ptr<PreviewSessionState> session( + new PreviewSessionState(this, &h, timeout_, ack)); + active.insert(session->getId()); + for_each(observers.begin(), observers.end(), + boost::bind(&Observer::opened, _1,boost::ref(*session))); + return session; +} + +void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) { + Mutex::ScopedLock l(lock); + active.erase(session->getId()); + session->suspend(); + session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); + suspended.push_back(session.release()); // In expiry order + eraseExpired(); +} + +std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id) +{ + Mutex::ScopedLock l(lock); + eraseExpired(); + if (active.find(id) != active.end()) + throw SessionBusyException( + QPID_MSG("Session already active: " << id)); + Suspended::iterator i = std::find_if( + suspended.begin(), suspended.end(), + boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1)) + ); + if (i == suspended.end()) + throw InvalidArgumentException( + QPID_MSG("No suspended session with id=" << id)); + active.insert(id); + std::auto_ptr<PreviewSessionState> state(suspended.release(i).release()); + return state; +} + +void PreviewSessionManager::erase(const framing::Uuid& id) +{ + Mutex::ScopedLock l(lock); + active.erase(id); +} + +void PreviewSessionManager::eraseExpired() { + // Called with lock held. + if (!suspended.empty()) { + Suspended::iterator keep = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2)); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } + } +} + +void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) { + observers.push_back(o); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionManager.h b/cpp/src/qpid/broker/PreviewSessionManager.h new file mode 100644 index 0000000000..65ca49ec89 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionManager.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H +#define QPID_BROKER_PREVIEWSESSIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/Uuid.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Mutex.h> +#include <qpid/RefCounted.h> + +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <set> +#include <vector> +#include <memory> + +namespace qpid { +namespace broker { + +class PreviewSessionState; +class PreviewSessionHandler; + +/** + * Create and manage PreviewSessionState objects. + */ +class PreviewSessionManager : private boost::noncopyable { + public: + /** + * Observer notified of PreviewSessionManager events. + */ + struct Observer : public RefCounted { + virtual void opened(PreviewSessionState&) {} + }; + + PreviewSessionManager(uint32_t ack); + + ~PreviewSessionManager(); + + /** Open a new active session, caller takes ownership */ + std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_); + + /** Suspend a session, start it's timeout counter. + * The factory takes ownership. + */ + void suspend(std::auto_ptr<PreviewSessionState> session); + + /** Resume a suspended session. + *@throw Exception if timed out or non-existant. + */ + std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); + + /** Add an Observer. */ + void add(const intrusive_ptr<Observer>&); + + private: + typedef boost::ptr_vector<PreviewSessionState> Suspended; + typedef std::set<framing::Uuid> Active; + typedef std::vector<intrusive_ptr<Observer> > Observers; + + void erase(const framing::Uuid&); + void eraseExpired(); + + sys::Mutex lock; + Suspended suspended; + Active active; + uint32_t ack; + Observers observers; + + friend class PreviewSessionState; // removes deleted sessions from active set. +}; + + + +}} // namespace qpid::broker + + + + + +#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/ diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp new file mode 100644 index 0000000000..7188ffbf40 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewSessionState.h" +#include "PreviewSessionManager.h" +#include "PreviewSessionHandler.h" +#include "ConnectionState.h" +#include "Broker.h" +#include "SemanticHandler.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +using namespace framing; +using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +PreviewSessionState::PreviewSessionState( + PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack, timeout_ > 0), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + in.next = semanticHandler.get(); + out.next = &handler->out; + + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } +} + +PreviewSessionState::~PreviewSessionState() { + // Remove ID from active session list. + if (factory) + factory->erase(getId()); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +PreviewSessionHandler* PreviewSessionState::getHandler() { + return handler; +} + +AMQP_ClientProxy& PreviewSessionState::getProxy() { + assert(isAttached()); + return getHandler()->getProxy(); +} + +ConnectionState& PreviewSessionState::getConnection() { + assert(isAttached()); + return getHandler()->getConnection(); +} + +void PreviewSessionState::detach() { + getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + Mutex::ScopedLock l(lock); + handler = 0; out.next = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } +} + +void PreviewSessionState::attach(PreviewSessionHandler& h) { + { + Mutex::ScopedLock l(lock); + handler = &h; + out.next = &handler->out; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } + } + h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} + +void PreviewSessionState::activateOutput() +{ + Mutex::ScopedLock l(lock); + if (isAttached()) { + getConnection().outputTasks.activateOutput(); + } +} + //This class could be used as the callback for queue notifications + //if not attached, it can simply ignore the callback, else pass it + //on to the connection + +ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->detach(); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + /* + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + */ + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h new file mode 100644 index 0000000000..6e8523317c --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionState.h @@ -0,0 +1,124 @@ +#ifndef QPID_BROKER_PREVIEWSESSION_H +#define QPID_BROKER_PREVIEWSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" +#include "SessionContext.h" + +#include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> + +#include <set> +#include <vector> +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class SemanticHandler; +class PreviewSessionHandler; +class PreviewSessionManager; +class Broker; +class ConnectionState; + +/** + * Broker-side session state includes sessions handler chains, which may + * themselves have state. + */ +class PreviewSessionState : public framing::SessionState, + public SessionContext, + public framing::FrameHandler::Chains, + public management::Manageable +{ + public: + ~PreviewSessionState(); + bool isAttached() { return handler; } + + void detach(); + void attach(PreviewSessionHandler& handler); + + + PreviewSessionHandler* getHandler(); + + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + ConnectionState& getConnection(); + + uint32_t getTimeout() const { return timeout; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + + /** OutputControl **/ + void activateOutput(); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + + // Normally SessionManager creates sessions. + PreviewSessionState(PreviewSessionManager*, + PreviewSessionHandler* out, + uint32_t timeout, + uint32_t ackInterval); + + + private: + PreviewSessionManager* factory; + PreviewSessionHandler* handler; + framing::Uuid id; + uint32_t timeout; + sys::AbsTime expiry; // Used by SessionManager. + Broker& broker; + framing::ProtocolVersion version; + sys::Mutex lock; + boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; + + friend class PreviewSessionManager; +}; + + +inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) { + return out << session.getId(); +} + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 2a79496144..fdde7ec18c 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,7 +22,6 @@ #include "SemanticHandler.h" #include "SemanticState.h" #include "SessionContext.h" -#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -37,9 +36,9 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : +SemanticHandler::SemanticHandler(SessionContext& s) : state(*this,s), session(s), - msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()), + msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()), ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) {} @@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - SessionContext* handler = session.getHandler(); - if (handler) { - uint32_t maxFrameSize = handler->getConnection().getFrameMax(); - MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize); - } else { - QPID_LOG(error, "Dropping message as session is no longer attached to a channel."); - } + uint32_t maxFrameSize = session.getConnection().getFrameMax(); + MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); return outgoing.hwm; } diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index d7f3ec8799..893a0cbded 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -46,7 +46,7 @@ class AMQHeaderBody; namespace broker { -class SessionState; +class SessionContext; class SemanticHandler : public DeliveryAdapter, public framing::FrameHandler, @@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter, typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; SemanticState state; - SessionState& session; + SessionContext& session; // TODO aconway 2007-09-20: Why are these on the handler rather than the // state? IncomingExecutionContext incoming; @@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter, framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } //Connection& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } public: - SemanticHandler(SessionState& session); + SemanticHandler(SessionContext& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 7b4035604f..c6c0ae5440 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -19,7 +19,7 @@ * */ -#include "SessionState.h" +#include "SessionContext.h" #include "BrokerAdapter.h" #include "Queue.h" #include "Connection.h" @@ -56,7 +56,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; -SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) +SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), deliveryAdapter(da), prefetchSize(0), @@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (parent->getSession().isAttached() && accept(msg.payload)) { - allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } - return true; - } else { - QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); - return false; + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } + return true; } bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) @@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(getSession().getBroker(), queue); + Queue::tryAutoDelete(getSession().getConnection().getBroker(), queue); } } } @@ -584,7 +579,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { if (name.empty()) { throw NotAllowedException(QPID_MSG("No queue name specified.")); } else { - queue = session.getBroker().getQueues().find(name); + queue = session.getConnection().getBroker().getQueues().find(name); if (!queue) throw NotFoundException(QPID_MSG("Queue not found: "<<name)); } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 7fc6e4167c..cc9c0e1e9b 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -45,7 +45,7 @@ namespace qpid { namespace broker { -class SessionState; +class SessionContext; /** * SemanticState holds the L3 and L4 state of an open session, whether @@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains, typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; - SessionState& session; + SessionContext& session; DeliveryAdapter& deliveryAdapter; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains, void cancel(ConsumerImpl&); public: - SemanticState(DeliveryAdapter&, SessionState&); + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); - SessionState& getSession() { return session; } + SessionContext& getSession() { return session; } /** * Get named queue, never returns 0. diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index a27b43cf65..63dd02cede 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/OutputControl.h" #include "ConnectionState.h" @@ -33,17 +34,12 @@ namespace qpid { namespace broker { -class SessionContext : public framing::FrameHandler::InOutHandler +class SessionContext : public sys::OutputControl { public: - SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {} virtual ~SessionContext(){} virtual ConnectionState& getConnection() = 0; - virtual const ConnectionState& getConnection() const = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; - virtual const framing::AMQP_ClientProxy& getProxy() const = 0; - virtual void detach() = 0; - virtual framing::ChannelId getChannel() const = 0; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 5a72bfb12d..e6bc463a82 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -28,7 +28,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" -#include "SessionContext.h" +#include "qpid/framing/SequenceNumber.h" #include <boost/noncopyable.hpp> @@ -36,16 +36,18 @@ namespace qpid { namespace broker { class Connection; +class ConnectionState; class SessionState; /** * A SessionHandler is associated with each active channel. It - * receives incoming frames, handles session commands and manages the + * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::AMQP_ServerOperations::ExecutionHandler, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); void detached(); + //Execution methods: + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; - Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index aa7ac9a8bb..571d3365db 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -45,7 +45,7 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionContext& h, uint32_t timeout_) + SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 94956a83ed..7e8bd18f57 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -38,7 +38,7 @@ namespace qpid { namespace broker { class SessionState; -class SessionContext; +class SessionHandler; /** * Create and manage SessionState objects. @@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index b6c59cfb3b..573a567da6 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -19,12 +19,16 @@ * */ #include "SessionState.h" -#include "SessionManager.h" -#include "SessionContext.h" -#include "ConnectionState.h" #include "Broker.h" +#include "ConnectionState.h" +#include "MessageDelivery.h" #include "SemanticHandler.h" +#include "SessionManager.h" +#include "SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" + +#include <boost/bind.hpp> namespace qpid { namespace broker { @@ -37,17 +41,17 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) + semanticState(*this, *this), + adapter(semanticState), + msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) { - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.addOutputTask(&semanticState); Manageable* parent = broker.GetVhostObject (); @@ -76,7 +80,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionContext* SessionState::getHandler() { +SessionHandler* SessionState::getHandler() { return handler; } @@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() { } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; + handler = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); } } -void SessionState::attach(SessionContext& h) { +void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; - out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); @@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + h.getConnection().outputTasks.addOutputTask(&semanticState); } void SessionState::activateOutput() @@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } +void SessionState::handleCommand(framing::AMQMethodBody* method) +{ + SequenceNumber id = incoming.next(); + Invoker::Result invocation = invoke(adapter, *method); + incoming.complete(id); + + if (!invocation.wasHandled()) { + throw NotImplementedException("Not implemented"); + } else if (invocation.hasResult()) { + getProxy().getExecution().result(id.getValue(), invocation.getResult()); + } + if (method->isSync()) { + incoming.sync(id); + sendCompletion(); + } + //TODO: if window gets too large send unsolicited completion +} + +void SessionState::handleContent(AMQFrame& frame) +{ + intrusive_ptr<Message> msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(incoming.next()); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&getConnection()); + semanticState.handle(msg); + msgBuilder.end(); + incoming.track(msg); + if (msg->getFrames().getMethod()->isSync()) { + incoming.sync(msg->getCommandId()); + sendCompletion(); + } + } +} + +void SessionState::handle(AMQFrame& frame) +{ + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assmblies are non-content bearing and all content-bearing + //assmeblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + handleContent(frame); + } + +} + +DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); + return outgoing.hwm; +} + +void SessionState::sendCompletion() +{ + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); + getProxy().getExecution().complete(mark.getValue(), range); +} + +void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + semanticState.ackCumulative(mark.getValue()); + } + range.processRanges(ackOp); +} + +void SessionState::flush() +{ + incoming.flush(); + sendCompletion(); +} + +void SessionState::sync() +{ + incoming.sync(); + sendCompletion(); +} + +void SessionState::noop() +{ + incoming.noop(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 8a12e580b7..98c21a8ab5 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -27,10 +27,15 @@ #include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" +#include "BrokerAdapter.h" +#include "DeliveryAdapter.h" +#include "MessageBuilder.h" +#include "SessionContext.h" +#include "SemanticState.h" +#include "IncomingExecutionContext.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -47,8 +52,7 @@ class AMQP_ClientProxy; namespace broker { -class SemanticHandler; -class SessionContext; +class SessionHandler; class SessionManager; class Broker; class ConnectionState; @@ -58,8 +62,8 @@ class ConnectionState; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::Chains, - public sys::OutputControl, + public SessionContext, + public DeliveryAdapter, public management::Manageable { public: @@ -67,10 +71,10 @@ class SessionState : public framing::SessionState, bool isAttached() { return handler; } void detach(); - void attach(SessionContext& handler); + void attach(SessionHandler& handler); - SessionContext* getHandler(); + SessionHandler* getHandler(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,6 +89,19 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + void handle(framing::AMQFrame& frame); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); + + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void sync(); + void sendCompletion(); + + //delivery adapter methods: + DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t @@ -92,21 +109,32 @@ class SessionState : public framing::SessionState, // Normally SessionManager creates sessions. SessionState(SessionManager*, - SessionContext* out, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); private: + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + SessionManager* factory; - SessionContext* handler; + SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; framing::ProtocolVersion version; sys::Mutex lock; - boost::scoped_ptr<SemanticHandler> semanticHandler; + + SemanticState semanticState; + BrokerAdapter adapter; + MessageBuilder msgBuilder; + + //execution state + IncomingExecutionContext incoming; + framing::Window outgoing; + RangedOperation ackOp; + management::Session::shared_ptr mgmtObject; friend class SessionManager; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..5152aa2e43 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,7 @@ */ #include "Cluster.h" -#include "qpid/broker/SessionState.h" +#include "qpid/broker/PreviewSessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,18 +32,18 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::PreviewSessionState; namespace { // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + PreviewSessionState& session; Cluster& cluster; bool busy; Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} void handle(AMQFrame& f) { Mutex::ScopedLock l(lock); @@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) { c.next = h; } -struct SessionObserver : public broker::SessionManager::Observer { +struct SessionObserver : public broker::PreviewSessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void opened(PreviewSessionState& s) { // FIXME aconway 2008-01-29: IList for memory management. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index b62b2be5f1..733db8003d 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } + intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - intrusive_ptr<broker::SessionManager::Observer> observer; + intrusive_ptr<broker::PreviewSessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..0ea3953175 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); + broker->getPreviewSessionManager().add(cluster->getObserver()); } } }; diff --git a/cpp/src/qpid/framing/AMQP_HighestVersion.h b/cpp/src/qpid/framing/AMQP_HighestVersion.h index 1be8856c13..b15e14d6f6 100644 --- a/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -33,6 +33,7 @@ namespace qpid { namespace framing { static ProtocolVersion highestProtocolVersion(99, 0); +//static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ } /* namespace qpid */ diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h index b6ac897e96..86b99a83b0 100644 --- a/cpp/src/qpid/framing/Proxy.h +++ b/cpp/src/qpid/framing/Proxy.h @@ -39,6 +39,7 @@ class Proxy void send(const AMQBody&); ProtocolVersion getVersion() const; + FrameHandler& getHandler() { return out; } protected: FrameHandler& out; diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index 700aeef47c..715cdaec2a 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { BOOST_CHECK_THROW(session.close(), InternalErrorException); } -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } |