diff options
author | Alan Conway <aconway@apache.org> | 2007-08-29 23:27:40 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-29 23:27:40 +0000 |
commit | e183227707d150b1f42e750df0e90cd7dac8744e (patch) | |
tree | a9156083c1890852c2d4013d4a856f9f28762946 /cpp/src | |
parent | 7422e57391a89bc2493cba18ca2ef0a84fec7baa (diff) | |
download | qpid-python-e183227707d150b1f42e750df0e90cd7dac8744e.tar.gz |
* src/qpid/broker/Session.h, .cpp: Session holds all state of a session including
handlers created for that session. Session is not directly associated with a channel.
* src/qpid/broker/SessionAdapter.h, .cpp: SessionAdapter is bound to a channel
managed by the Connection. It can be attached to and detatched from a Session.
* src/qpid/broker/Connection.cpp, .h: Use SessionAdapter.
* src/qpid/framing/Handler.h: Removed use of shared_ptr. Handlers belong
either to a Session or a Connection and are destroyed with it.
* src/qpid/framing/InputHandler.h, OutputHandler.h: Both now inherit from
FrameHandler and can be used as FrameHandlers. Intermediate step to removing
them entirely.
* src/qpid/broker/ConnectionAdapter.h:
* src/qpid/client/ConnectionHandler.h:
* src/qpid/framing/ChannelAdapter.cpp, .h:
Minor changes required by Handler changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.h | 39 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Session.h | 60 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 69 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 48 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 91 | ||||
-rw-r--r-- | cpp/src/qpid/framing/InputHandler.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/framing/OutputHandler.h | 18 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 10 |
17 files changed, 294 insertions, 199 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f1fee3e61d..505baa87dc 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -191,9 +191,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ - qpid/broker/SessionState.h \ - qpid/broker/SuspendedSessions.h \ - qpid/broker/SuspendedSessions.cpp \ + qpid/broker/Session.h \ + qpid/broker/Session.cpp \ qpid/broker/SessionAdapter.h \ qpid/broker/SessionAdapter.cpp \ qpid/broker/SemanticHandler.cpp \ diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index f082c5cdb6..08d5ba0ab3 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -28,6 +28,8 @@ #include "BrokerAdapter.h" #include "SemanticHandler.h" +#include <boost/utility/in_place_factory.hpp> + using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -50,11 +52,12 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { + // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr. + // OLD COMMENT: // Assign handler to new shared_ptr, as it may be erased // from the map by handle() if frame is a ChannelClose. // - FrameHandler::Chain handler=getChannel((frame.getChannel())).in; - handler->handle(frame); + getChannel((frame.getChannel())).in(frame); } } @@ -97,15 +100,16 @@ void Connection::closeChannel(uint16_t id) { FrameHandler::Chains& Connection::getChannel(ChannelId id) { - ChannelMap::iterator i = channels.find(id); - if (i == channels.end()) { - FrameHandler::Chains chains( - new SemanticHandler(id, *this), - new OutputHandlerFrameHandler(*out)); - broker.update(id, chains); - i = channels.insert(ChannelMap::value_type(id, chains)).first; - } - return i->second; + // FIXME aconway 2007-08-29: Assuming session on construction, + // move this to SessionAdapter::open. + boost::optional<SessionAdapter>& ch = channels[id]; + if (!ch) { + ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29: + assert(ch->getSession()); + broker.update(id, *ch->getSession()); + } + assert(ch->getSession()); + return *ch->getSession(); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index b552267452..08beb0a3ea 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -38,6 +38,9 @@ #include "qpid/Exception.h" #include "BrokerChannel.h" #include "ConnectionAdapter.h" +#include "SessionAdapter.h" + +#include <boost/optional.hpp> namespace qpid { namespace broker { @@ -82,8 +85,8 @@ class Connection : public sys::ConnectionInputHandler, void closed(); private: - typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - + // Use boost::optional to allow default-constructed uninitialized entries in the map. + typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; framing::ProtocolVersion version; diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index 175f57df7d..7672daed10 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -66,7 +66,7 @@ framing::ProtocolVersion ConnectionAdapter::getVersion() const void ConnectionAdapter::handle(framing::AMQFrame& frame) { - getHandlers().in->handle(frame); + getHandlers().in(frame); } ConnectionAdapter::ConnectionAdapter(Connection& connection) @@ -74,27 +74,27 @@ ConnectionAdapter::ConnectionAdapter(Connection& connection) handler = std::auto_ptr<Handler>(new Handler(connection, *this)); } -Handler::Handler(Connection& c, ConnectionAdapter& a) : +ConnectionAdapter::Handler:: Handler(Connection& c, ConnectionAdapter& a) : proxy(a), client(proxy.getConnection()), connection(c) {} -void Handler::startOk(const FieldTable& /*clientProperties*/, +void ConnectionAdapter::Handler::startOk(const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/) { client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } -void Handler::secureOk(const string& /*response*/){} +void ConnectionAdapter::Handler::secureOk(const string& /*response*/){} -void Handler::tuneOk(uint16_t /*channelmax*/, +void ConnectionAdapter::Handler::tuneOk(uint16_t /*channelmax*/, uint32_t framemax, uint16_t heartbeat) { connection.setFrameMax(framemax); connection.setHeartbeat(heartbeat); } -void Handler::open(const string& /*virtualHost*/, +void ConnectionAdapter::Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; @@ -102,13 +102,13 @@ void Handler::open(const string& /*virtualHost*/, } -void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, +void ConnectionAdapter::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { client.closeOk(); connection.getOutput().close(); } -void Handler::closeOk(){ +void ConnectionAdapter::Handler::closeOk(){ connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 9aa3d130e8..e3102faf59 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -35,12 +35,29 @@ namespace qpid { namespace broker { class Connection; -struct Handler; class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations { + struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler + { + framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Connection client; + Connection& connection; + + Handler(Connection& connection, ConnectionAdapter& adapter); + void startOk(const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(const std::string& response); + void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void open(const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(uint16_t replyCode, const std::string& replyText, + uint16_t classId, uint16_t methodId); + void closeOk(); + }; std::auto_ptr<Handler> handler; -public: + public: ConnectionAdapter(Connection& connection); void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); @@ -74,24 +91,6 @@ public: framing::ProtocolVersion getVersion() const; }; -struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler -{ - framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Connection client; - Connection& connection; - - Handler(Connection& connection, ConnectionAdapter& adapter); - void startOk(const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const std::string& response); - void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); - void open(const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(uint16_t replyCode, const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(); -}; }} diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 09ab8ec465..b259aa6b8f 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -131,10 +131,7 @@ void MessageDelivery::deliver(Message::shared_ptr msg, boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); t->sendMethod(msg, channel, id); - boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out; - //send header - msg->sendHeader(*handler, channel.getId(), framesize); - - //send content - msg->sendContent(*handler, channel.getId(), framesize); + FrameHandler& handler = channel.getHandlers().out; + msg->sendHeader(handler, channel.getId(), framesize); + msg->sendContent(handler, channel.getId(), framesize); } diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp new file mode 100644 index 0000000000..2940c8cccb --- /dev/null +++ b/cpp/src/qpid/broker/Session.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Session.h" +#include "SemanticHandler.h" +#include "SessionAdapter.h" + +namespace qpid { +namespace broker { + +Session::Session(SessionAdapter& a, uint32_t t) + : adapter(&a), timeout(t) +{ + assert(adapter); + // FIXME aconway 2007-08-29: handler to get Session, not connection. + handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection())); + in = &handlers[0]; + out = &adapter->getConnection().getOutput(); +} + + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h new file mode 100644 index 0000000000..927d197390 --- /dev/null +++ b/cpp/src/qpid/broker/Session.h @@ -0,0 +1,60 @@ +#ifndef QPID_BROKER_SESSION_H +#define QPID_BROKER_SESSION_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" + +#include <boost/ptr_container/ptr_vector.hpp> + +namespace qpid { +namespace broker { + +class SessionAdapter; + +/** + * Session holds the state of an open session, whether attached to a + * channel or suspended. It also holds the handler chains associated + * with the session. + */ +class Session : public framing::FrameHandler::Chains, + private boost::noncopyable +{ + public: + Session(SessionAdapter&, uint32_t timeout); + + /** Returns 0 if this session is not currently attached */ + SessionAdapter* getAdapter() { return adapter; } + const SessionAdapter* getAdapter() const { return adapter; } + + uint32_t getTimeout() const { return timeout; } + + private: + SessionAdapter* adapter; + uint32_t timeout; + boost::ptr_vector<framing::FrameHandler> handlers; +}; + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2c471ff098..44245f9689 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -24,69 +24,22 @@ namespace qpid { namespace broker { using namespace framing; -SessionAdapter::~SessionAdapter() { +SessionAdapter::SessionAdapter(Connection& c, ChannelId ch) + : connection(c), channel(ch) +{ + // FIXME aconway 2007-08-29: When we handle session commands, + // do this on open. + session.reset(new Session(*this, 0)); } -SessionAdapter::SessionAdapter() { - // FIXME aconway 2007-08-27: Implement -} - -void SessionAdapter::visit(const SessionOpenBody&) { - // FIXME aconway 2007-08-27: Implement -} - -void SessionAdapter::visit(const SessionAckBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionAttachedBody&) { - // FIXME aconway 2007-08-27: Implement -} +SessionAdapter::~SessionAdapter() {} -void SessionAdapter::visit(const SessionCloseBody&) { - // FIXME aconway 2007-08-27: Implement +void SessionAdapter::handle(AMQFrame& f) { + // FIXME aconway 2007-08-29: handle session commands here, forward + // other frames. + session->in(f); } -void SessionAdapter::visit(const SessionClosedBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionDetachedBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionFlowBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionFlowOkBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionHighWaterMarkBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionResumeBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionSolicitAckBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionSuspendBody&) { - // FIXME aconway 2007-08-27: Implement -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index a190a7f2b7..237e2c8b64 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -22,43 +22,45 @@ * */ -#include "qpid/framing/FrameDefaultVisitor.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/broker/SuspendedSessions.h" +#include "qpid/broker/Session.h" +#include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { +class Connection; +class Session; + /** - * Session Handler: Handles frames arriving for a session. - * Implements AMQP session class commands, forwards other traffic - * to the next handler in the chain. + * A SessionAdapter is associated with each active channel. It + * receives incoming frames, handles session commands and manages the + * association between the channel and a session. + * + * SessionAdapters can be stored in a map by value. */ -class SessionAdapter : public framing::FrameVisitorHandler +class SessionAdapter : public framing::FrameHandler { public: - SessionAdapter(); + SessionAdapter(Connection&, framing::ChannelId); ~SessionAdapter(); - protected: - void visit(const framing::SessionAckBody&); - void visit(const framing::SessionAttachedBody&); - void visit(const framing::SessionCloseBody&); - void visit(const framing::SessionClosedBody&); - void visit(const framing::SessionDetachedBody&); - void visit(const framing::SessionFlowBody&); - void visit(const framing::SessionFlowOkBody&); - void visit(const framing::SessionHighWaterMarkBody&); - void visit(const framing::SessionOpenBody&); - void visit(const framing::SessionResumeBody&); - void visit(const framing::SessionSolicitAckBody&); - void visit(const framing::SessionSuspendBody&); + /** Handle AMQP session methods, pass other frames to the session + * if there is one. Frames channel must be == getChannel() + */ + void handle(framing::AMQFrame&); + + /** Returns 0 if not attached to a session */ + Session* getSession() const { return session.get(); } - using FrameDefaultVisitor::visit; + framing::ChannelId getChannel() const { return channel; } + Connection& getConnection() { return connection; } + const Connection& getConnection() const { return connection; } private: - SessionState state; - SuspendedSessions* suspended; + Connection& connection; + const framing::ChannelId channel; + shared_ptr<Session> session; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index d05ae1428b..e409f0f2a9 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -54,7 +54,7 @@ class ConnectionHandler : private StateManager, { enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set<int> ESTABLISHED; - + void handle(framing::AMQMethodBody* method); void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); @@ -62,6 +62,7 @@ class ConnectionHandler : private StateManager, void fail(const std::string& message); public: + using InputHandler::handle; typedef boost::function<void()> CloseListener; typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 86b60d896b..027679228a 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -31,28 +31,24 @@ using boost::format; namespace qpid { namespace framing { -/** Framehandler that feeds into the channel. */ -struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler { - ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {} - void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); } - ChannelAdapter& channel; -}; +ChannelAdapter::Handler::Handler(ChannelAdapter& c) : parent(c) {} +void ChannelAdapter::Handler::handle(AMQFrame& f) { parent.handleBody(f.getBody()); } -void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) +ChannelAdapter::ChannelAdapter() : handler(*this), id(0) {} + +void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) { assertChannelNotOpen(); id = i; version = v; - - handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this)); - handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); + handlers.reset(&handler, &out); } void ChannelAdapter::send(const AMQBody& body) { assertChannelOpen(); AMQFrame frame(getId(), body); - handlers.out->handle(frame); + handlers.out(frame); } void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { @@ -73,4 +69,6 @@ void ChannelAdapter::assertChannelNotOpen() const { 504, format("Channel %d is already open.") % getId()); } +void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); } + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 729f5e7b47..82f7115001 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -55,7 +55,7 @@ class ChannelAdapter : protected BodyHandler { /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter() : id(0) {} + ChannelAdapter(); virtual ~ChannelAdapter() {} /** Initialize the channel adapter. */ @@ -69,7 +69,8 @@ class ChannelAdapter : protected BodyHandler { virtual void send(const AMQBody& body); virtual bool isOpen() const = 0; - + + void handle(AMQFrame& f); protected: void assertMethodOk(AMQMethodBody& method) const; void assertChannelOpen() const; @@ -78,9 +79,12 @@ class ChannelAdapter : protected BodyHandler { virtual void handleMethod(AMQMethodBody*) = 0; private: - class ChannelAdapterHandler; - friend class ChannelAdapterHandler; - + struct Handler : public FrameHandler { + Handler(ChannelAdapter&); + void handle(AMQFrame&); + ChannelAdapter& parent; + }; + Handler handler; ChannelId id; ProtocolVersion version; FrameHandler::Chains handlers; diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 2f09911325..be49570f9b 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -27,33 +27,90 @@ namespace qpid { namespace framing { -/** Interface for handler for values of type T. - * Handlers can be linked into chains via the next pointer. - */ -template <class T> struct Handler { - typedef T ParamType; - typedef shared_ptr<Handler> Chain; +/** Generic handler that can be linked into chains. */ +template <class T> +struct Handler { + typedef T HandledType; + + Handler(Handler<T>* next_=0) : next(next_) {} + virtual ~Handler() {} + virtual void handle(T) = 0; + + /** Allow functor syntax for calling handle */ + void operator()(T t) { handle(t); } + - /** Handler chains for incoming and outgoing traffic. */ + /** Pointer to next handler in a linked list. */ + Handler<T>* next; + + /** A Chain is a handler that forwards to a modifiable + * linked list of handlers. + */ + struct Chain : public Handler<T> { + Chain(Handler<T>* first) : Handler(first) {} + void operator=(Handler<T>* h) { next = h; } + void handle(T t) { (*next)(t); } + // TODO aconway 2007-08-29: chain modifier ops here. + }; + + /** In/out pair of handler chains. */ struct Chains { - Chains() {} - Chains(Chain i, Chain o) : in(i), out(o) {} - Chains(Handler* i, Handler* o) : in(i), out(o) {} + Chains(Handler<T>* in_=0, Handler<T>* out_=0) : in(in_), out(out_) {} + void reset(Handler<T>* in_=0, Handler<T>* out_=0) { in = in_; out = out_; } Chain in; Chain out; }; - Handler() {} - Handler(Chain next_) : next(next_) {} - virtual ~Handler() {} + /** Adapt any void(T) functor as a Handler. + * Functor<F>(f) will copy f. + * Functor<F&>(f) will only take a reference to x. + */ + template <class F> class Functor : public Handler<T> { + public: + Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {} + void handle(T t) { functor(t); } + private: + F functor; + }; - virtual void handle(T) = 0; + /** Adapt a member function of X as a Handler. + * MemFun<X, X::f> will copy x. + * MemFun<X&, X::f> will only take a reference to x. + */ + template <class X, void(*M)(T)> + class MemFun : public Handler<T> { + public: + MemFun(X x, Handler<T>* next=0) : Handler(next), object(x) {} + void handle(T t) { object.*M(t); } + private: + X object; + }; + + /** Support for implementing an in-out handler pair as a single class. + * Public interface is Handler<T>::Chains pair, but implementation + * overrides handleIn, handleOut functions in a single class. + */ + class InOutHandler { + public: + virtual ~InOutHandler() {} + + InOutHandler() : + in(*this, &InOutHandler::handleIn), + out(*this, &InOutHandler::handleOut) {} + + MemFun<InOutHandler, &InOutHandler::handleIn> in; + MemFun<InOutHandler, &InOutHandler::handleOut> out; + + protected: + virtual void handleIn(T) = 0; + virtual void handleOut(T) = 0; + private: + }; - /** Next handler. Public so chains can be modified by altering next. */ - Chain next; }; -}} +}} #endif /*!QPID_FRAMING_HANDLER_H*/ +// diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h index 48a96803da..99e4e774e1 100644 --- a/cpp/src/qpid/framing/InputHandler.h +++ b/cpp/src/qpid/framing/InputHandler.h @@ -27,24 +27,12 @@ namespace qpid { namespace framing { -class InputHandler : private boost::noncopyable { +// FIXME aconway 2007-08-29: Eliminate, replace with FrameHandler. +class InputHandler : public FrameHandler { public: virtual ~InputHandler() {} virtual void received(AMQFrame&) = 0; -}; - -/** FrameHandler that delegates to an InputHandler */ -struct InputHandlerFrameHandler : public FrameHandler { - InputHandlerFrameHandler(InputHandler& in_) : in(in_) {} - void handle(ParamType frame) { in.received(frame); } - InputHandler& in; -}; - -/** InputHandler that delegates to a FrameHandler */ -struct FrameHandlerInputHandler : public InputHandler { - FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {} - void received(AMQFrame& frame) { handler->handle(frame); } - FrameHandler::Chain handler; + void handle(AMQFrame& f) { received(f); } }; }} diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h index 89917ac3df..925ff88b12 100644 --- a/cpp/src/qpid/framing/OutputHandler.h +++ b/cpp/src/qpid/framing/OutputHandler.h @@ -27,24 +27,12 @@ namespace qpid { namespace framing { -class OutputHandler : private boost::noncopyable { +// FIXME aconway 2007-08-29: Replace with FrameHandler. +class OutputHandler : public FrameHandler { public: virtual ~OutputHandler() {} virtual void send(AMQFrame&) = 0; -}; - -/** OutputHandler that delegates to a FrameHandler */ -struct FrameHandlerOutputHandler : public OutputHandler { - FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {} - void received(AMQFrame& frame) { handler->handle(frame); } - FrameHandler::Chain handler; -}; - -/** FrameHandler that delegates to an OutputHandler */ -struct OutputHandlerFrameHandler : public FrameHandler { - OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {} - void handle(ParamType frame) { out.send(frame); } - OutputHandler& out; + void handle(AMQFrame& f) { send(f); } }; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 7ff6a843a9..545eb965c4 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -19,10 +19,12 @@ CLEANFILES= # # Unit test programs. # -TESTS+=Session -check_PROGRAMS+=Session -Session_SOURCES=Session.cpp -Session_LDADD=-lboost_unit_test_framework $(lib_broker) + +# FIXME aconway 2007-08-29: enable when session is reinstated. +# TESTS+=Session +# check_PROGRAMS+=Session +# Session_SOURCES=Session.cpp +# Session_LDADD=-lboost_unit_test_framework $(lib_broker) TESTS+=Blob check_PROGRAMS+=Blob |