diff options
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 46 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 49 |
3 files changed, 75 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index c6ba73e43a..e46f11d98c 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -40,11 +40,7 @@ framing::AMQP_ClientProxy& SessionHandler::getProxy() { } SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : connection(c), channel(ch), ignoring(false) -{ - in = this; - out = &c.getOutput(); -} + : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), ignoring(false), channelHandler(*this) {} SessionHandler::~SessionHandler() {} @@ -53,7 +49,7 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::handle(AMQFrame& f) { +void SessionHandler::handleIn(AMQFrame& f) { // Note on channel states: a channel is open if session != 0. A // channel that is closed (session == 0) can be in the "ignoring" // state. This is a temporary state after we have sent a channel @@ -62,7 +58,7 @@ void SessionHandler::handle(AMQFrame& f) { // AMQMethodBody* m=f.getMethod(); try { - if (m && m->invoke(static_cast<Invocable*>(this))) + if (m && m->invoke(&channelHandler)) return; else if (session) session->in(f); @@ -82,6 +78,11 @@ void SessionHandler::handle(AMQFrame& f) { } } +void SessionHandler::handleOut(AMQFrame& f) { + f.setChannel(getChannel()); + out.next->handle(f); +} + void SessionHandler::assertOpen(const char* method) { if (!session) throw ChannelErrorException( @@ -99,21 +100,21 @@ void SessionHandler::assertClosed(const char* method) { << getChannel())); } -void SessionHandler::open(const string& /*outOfBand*/){ - assertClosed("open"); - session.reset(new Session(*this, 0)); - getProxy().getChannel().openOk(); +void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){ + parent.assertClosed("open"); + parent.session.reset(new Session(parent, 0)); + parent.getProxy().getChannel().openOk(); } // FIXME aconway 2007-08-31: flow is no longer in the spec. -void SessionHandler::flow(bool active){ - session->flow(active); - getProxy().getChannel().flowOk(active); +void SessionHandler::ChannelMethods::flow(bool active){ + parent.session->flow(active); + parent.getProxy().getChannel().flowOk(active); } -void SessionHandler::flowOk(bool /*active*/){} +void SessionHandler::ChannelMethods::flowOk(bool /*active*/){} -void SessionHandler::close(uint16_t replyCode, +void SessionHandler::ChannelMethods::close(uint16_t replyCode, const string& replyText, uint16_t classId, uint16_t methodId) { @@ -123,21 +124,21 @@ void SessionHandler::close(uint16_t replyCode, <<replyText << "," << "classid=" <<classId<< "," << "methodid=" <<methodId); - ignoring=false; - getProxy().getChannel().closeOk(); + parent.ignoring=false; + parent.getProxy().getChannel().closeOk(); // FIXME aconway 2007-08-31: sould reset session BEFORE // sending closeOK to avoid races. SessionHandler // needs its own private proxy, see getProxy() above. - session.reset(); + parent.session.reset(); // No need to remove from connection map, will be re-used // if channel is re-opened. } -void SessionHandler::closeOk(){ - ignoring=false; +void SessionHandler::ChannelMethods::closeOk(){ + parent.ignoring=false; } -void SessionHandler::ok() +void SessionHandler::ChannelMethods::ok() { //no specific action required, generic response handling should be //sufficient diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 62e4c8daf2..c7c172b01d 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -44,20 +44,12 @@ class Session; * * SessionHandlers can be stored in a map by value. */ -class SessionHandler : - public framing::FrameHandler::Chains, - private framing::FrameHandler, - private framing::AMQP_ServerOperations::ChannelHandler +class SessionHandler : public framing::FrameHandler::InOutHandler { public: SessionHandler(Connection&, framing::ChannelId); ~SessionHandler(); - /** Handle AMQP session methods, pass other frames to the session - * if there is one. Frames channel must be == getChannel() - */ - void handle(framing::AMQFrame&); - /** Returns 0 if not attached to a session */ Session* getSession() const { return session.get(); } @@ -65,28 +57,40 @@ class SessionHandler : Connection& getConnection() { return connection; } const Connection& getConnection() const { return connection; } + protected: + void handleIn(framing::AMQFrame&); + void handleOut(framing::AMQFrame&); + private: + // FIXME aconway 2007-08-31: Move to session methods. + struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler { + SessionHandler& parent; + + ChannelMethods(SessionHandler& p) : parent(p) {} + void open(const std::string& outOfBand); + void flow(bool active); + void flowOk(bool active); + void ok( ); + void ping( ); + void pong( ); + void resume( const std::string& channelId ); + void close(uint16_t replyCode, + const std::string& replyText, + uint16_t classId, uint16_t methodId); + void closeOk(); + }; + friend class ChannelMethods; + void assertOpen(const char* method); void assertClosed(const char* method); framing::AMQP_ClientProxy& getProxy(); - // FIXME aconway 2007-08-31: Replace channel commands with session. - void open(const std::string& outOfBand); - void flow(bool active); - void flowOk(bool active); - void ok( ); - void ping( ); - void pong( ); - void resume( const std::string& channelId ); - void close(uint16_t replyCode, const - std::string& replyText, uint16_t classId, uint16_t methodId); - void closeOk(); - Connection& connection; const framing::ChannelId channel; shared_ptr<Session> session; bool ignoring; + ChannelMethods channelHandler; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 3129e7d60e..3e55dff1bd 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -22,6 +22,7 @@ * */ #include "qpid/shared_ptr.h" +#include <boost/type_traits/remove_reference.hpp> #include <assert.h> namespace qpid { @@ -31,6 +32,7 @@ namespace framing { template <class T> struct Handler { typedef T HandledType; + typedef void handleFptr(T); Handler(Handler<T>* next_=0) : next(next_) {} virtual ~Handler() {} @@ -74,37 +76,40 @@ struct Handler { }; /** Adapt a member function of X as a Handler. - * MemFun<X, X::f> will copy x. - * MemFun<X&, X::f> will only take a reference to x. + * Only holds a reference to its target, not a copy. */ - template <class X, void(*M)(T)> - class MemFun : public Handler<T> { + template <class X, void (X::*F)(T)> + class MemFunRef : public Handler<T> { public: - MemFun(X x, Handler<T>* next=0) : Handler(next), object(x) {} - void handle(T t) { object.*M(t); } + MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(x) {} + void handle(T t) { (target.*F)(t); } + + /** Allow calling with -> syntax, compatible with Chains */ + MemFunRef* operator->() { return this; } + private: - X object; + X& target; }; - /** Support for implementing an in-out handler pair as a single class. - * Public interface is Handler<T>::Chains pair, but implementation - * overrides handleIn, handleOut functions in a single class. + /** Interface for a handler that implements a + * pair of in/out handle operations. + * @see InOutHandler */ - class InOutHandler { + class InOutHandlerInterface { public: - virtual ~InOutHandler() {} - - InOutHandler() : - in(*this, &InOutHandler::handleIn), - out(*this, &InOutHandler::handleOut) {} - - MemFun<InOutHandler, &InOutHandler::handleIn> in; - MemFun<InOutHandler, &InOutHandler::handleOut> out; - - protected: + virtual ~InOutHandlerInterface() {} virtual void handleIn(T) = 0; virtual void handleOut(T) = 0; - private: + }; + + /** Support for implementing an in-out handler pair as a single class. + * Public interface is Handler<T>::Chains pair, but implementation + * overrides handleIn, handleOut functions in a single class. + */ + struct InOutHandler : protected InOutHandlerInterface { + InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in; + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out; }; }; |