summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-29 23:27:40 +0000
committerAlan Conway <aconway@apache.org>2007-08-29 23:27:40 +0000
commite183227707d150b1f42e750df0e90cd7dac8744e (patch)
treea9156083c1890852c2d4013d4a856f9f28762946 /cpp/src
parent7422e57391a89bc2493cba18ca2ef0a84fec7baa (diff)
downloadqpid-python-e183227707d150b1f42e750df0e90cd7dac8744e.tar.gz
* src/qpid/broker/Session.h, .cpp: Session holds all state of a session including
handlers created for that session. Session is not directly associated with a channel. * src/qpid/broker/SessionAdapter.h, .cpp: SessionAdapter is bound to a channel managed by the Connection. It can be attached to and detatched from a Session. * src/qpid/broker/Connection.cpp, .h: Use SessionAdapter. * src/qpid/framing/Handler.h: Removed use of shared_ptr. Handlers belong either to a Session or a Connection and are destroyed with it. * src/qpid/framing/InputHandler.h, OutputHandler.h: Both now inherit from FrameHandler and can be used as FrameHandlers. Intermediate step to removing them entirely. * src/qpid/broker/ConnectionAdapter.h: * src/qpid/client/ConnectionHandler.h: * src/qpid/framing/ChannelAdapter.cpp, .h: Minor changes required by Handler changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am5
-rw-r--r--cpp/src/qpid/broker/Connection.cpp26
-rw-r--r--cpp/src/qpid/broker/Connection.h7
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp16
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h39
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp9
-rw-r--r--cpp/src/qpid/broker/Session.cpp40
-rw-r--r--cpp/src/qpid/broker/Session.h60
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp69
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h48
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h3
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp20
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h14
-rw-r--r--cpp/src/qpid/framing/Handler.h91
-rw-r--r--cpp/src/qpid/framing/InputHandler.h18
-rw-r--r--cpp/src/qpid/framing/OutputHandler.h18
-rw-r--r--cpp/src/tests/Makefile.am10
17 files changed, 294 insertions, 199 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index f1fee3e61d..505baa87dc 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -191,9 +191,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
- qpid/broker/SessionState.h \
- qpid/broker/SuspendedSessions.h \
- qpid/broker/SuspendedSessions.cpp \
+ qpid/broker/Session.h \
+ qpid/broker/Session.cpp \
qpid/broker/SessionAdapter.h \
qpid/broker/SessionAdapter.cpp \
qpid/broker/SemanticHandler.cpp \
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index f082c5cdb6..08d5ba0ab3 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -28,6 +28,8 @@
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
+#include <boost/utility/in_place_factory.hpp>
+
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -50,11 +52,12 @@ void Connection::received(framing::AMQFrame& frame){
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
+ // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr.
+ // OLD COMMENT:
// Assign handler to new shared_ptr, as it may be erased
// from the map by handle() if frame is a ChannelClose.
//
- FrameHandler::Chain handler=getChannel((frame.getChannel())).in;
- handler->handle(frame);
+ getChannel((frame.getChannel())).in(frame);
}
}
@@ -97,15 +100,16 @@ void Connection::closeChannel(uint16_t id) {
FrameHandler::Chains& Connection::getChannel(ChannelId id) {
- ChannelMap::iterator i = channels.find(id);
- if (i == channels.end()) {
- FrameHandler::Chains chains(
- new SemanticHandler(id, *this),
- new OutputHandlerFrameHandler(*out));
- broker.update(id, chains);
- i = channels.insert(ChannelMap::value_type(id, chains)).first;
- }
- return i->second;
+ // FIXME aconway 2007-08-29: Assuming session on construction,
+ // move this to SessionAdapter::open.
+ boost::optional<SessionAdapter>& ch = channels[id];
+ if (!ch) {
+ ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29:
+ assert(ch->getSession());
+ broker.update(id, *ch->getSession());
+ }
+ assert(ch->getSession());
+ return *ch->getSession();
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index b552267452..08beb0a3ea 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -38,6 +38,9 @@
#include "qpid/Exception.h"
#include "BrokerChannel.h"
#include "ConnectionAdapter.h"
+#include "SessionAdapter.h"
+
+#include <boost/optional.hpp>
namespace qpid {
namespace broker {
@@ -82,8 +85,8 @@ class Connection : public sys::ConnectionInputHandler,
void closed();
private:
- typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
-
+ // Use boost::optional to allow default-constructed uninitialized entries in the map.
+ typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
framing::ProtocolVersion version;
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index 175f57df7d..7672daed10 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -66,7 +66,7 @@ framing::ProtocolVersion ConnectionAdapter::getVersion() const
void ConnectionAdapter::handle(framing::AMQFrame& frame)
{
- getHandlers().in->handle(frame);
+ getHandlers().in(frame);
}
ConnectionAdapter::ConnectionAdapter(Connection& connection)
@@ -74,27 +74,27 @@ ConnectionAdapter::ConnectionAdapter(Connection& connection)
handler = std::auto_ptr<Handler>(new Handler(connection, *this));
}
-Handler::Handler(Connection& c, ConnectionAdapter& a) :
+ConnectionAdapter::Handler:: Handler(Connection& c, ConnectionAdapter& a) :
proxy(a), client(proxy.getConnection()), connection(c) {}
-void Handler::startOk(const FieldTable& /*clientProperties*/,
+void ConnectionAdapter::Handler::startOk(const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/)
{
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
}
-void Handler::secureOk(const string& /*response*/){}
+void ConnectionAdapter::Handler::secureOk(const string& /*response*/){}
-void Handler::tuneOk(uint16_t /*channelmax*/,
+void ConnectionAdapter::Handler::tuneOk(uint16_t /*channelmax*/,
uint32_t framemax, uint16_t heartbeat)
{
connection.setFrameMax(framemax);
connection.setHeartbeat(heartbeat);
}
-void Handler::open(const string& /*virtualHost*/,
+void ConnectionAdapter::Handler::open(const string& /*virtualHost*/,
const string& /*capabilities*/, bool /*insist*/)
{
string knownhosts;
@@ -102,13 +102,13 @@ void Handler::open(const string& /*virtualHost*/,
}
-void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
+void ConnectionAdapter::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
client.closeOk();
connection.getOutput().close();
}
-void Handler::closeOk(){
+void ConnectionAdapter::Handler::closeOk(){
connection.getOutput().close();
}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 9aa3d130e8..e3102faf59 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -35,12 +35,29 @@ namespace qpid {
namespace broker {
class Connection;
-struct Handler;
class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations
{
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+ {
+ framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Connection client;
+ Connection& connection;
+
+ Handler(Connection& connection, ConnectionAdapter& adapter);
+ void startOk(const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const std::string& response);
+ void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void open(const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(uint16_t replyCode, const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk();
+ };
std::auto_ptr<Handler> handler;
-public:
+ public:
ConnectionAdapter(Connection& connection);
void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
@@ -74,24 +91,6 @@ public:
framing::ProtocolVersion getVersion() const;
};
-struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
-{
- framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Connection client;
- Connection& connection;
-
- Handler(Connection& connection, ConnectionAdapter& adapter);
- void startOk(const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const std::string& response);
- void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
- void open(const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(uint16_t replyCode, const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk();
-};
}}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 09ab8ec465..b259aa6b8f 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -131,10 +131,7 @@ void MessageDelivery::deliver(Message::shared_ptr msg,
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
t->sendMethod(msg, channel, id);
- boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out;
- //send header
- msg->sendHeader(*handler, channel.getId(), framesize);
-
- //send content
- msg->sendContent(*handler, channel.getId(), framesize);
+ FrameHandler& handler = channel.getHandlers().out;
+ msg->sendHeader(handler, channel.getId(), framesize);
+ msg->sendContent(handler, channel.getId(), framesize);
}
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
new file mode 100644
index 0000000000..2940c8cccb
--- /dev/null
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Session.h"
+#include "SemanticHandler.h"
+#include "SessionAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+Session::Session(SessionAdapter& a, uint32_t t)
+ : adapter(&a), timeout(t)
+{
+ assert(adapter);
+ // FIXME aconway 2007-08-29: handler to get Session, not connection.
+ handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection()));
+ in = &handlers[0];
+ out = &adapter->getConnection().getOutput();
+}
+
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
new file mode 100644
index 0000000000..927d197390
--- /dev/null
+++ b/cpp/src/qpid/broker/Session.h
@@ -0,0 +1,60 @@
+#ifndef QPID_BROKER_SESSION_H
+#define QPID_BROKER_SESSION_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SessionAdapter;
+
+/**
+ * Session holds the state of an open session, whether attached to a
+ * channel or suspended. It also holds the handler chains associated
+ * with the session.
+ */
+class Session : public framing::FrameHandler::Chains,
+ private boost::noncopyable
+{
+ public:
+ Session(SessionAdapter&, uint32_t timeout);
+
+ /** Returns 0 if this session is not currently attached */
+ SessionAdapter* getAdapter() { return adapter; }
+ const SessionAdapter* getAdapter() const { return adapter; }
+
+ uint32_t getTimeout() const { return timeout; }
+
+ private:
+ SessionAdapter* adapter;
+ uint32_t timeout;
+ boost::ptr_vector<framing::FrameHandler> handlers;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2c471ff098..44245f9689 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -24,69 +24,22 @@ namespace qpid {
namespace broker {
using namespace framing;
-SessionAdapter::~SessionAdapter() {
+SessionAdapter::SessionAdapter(Connection& c, ChannelId ch)
+ : connection(c), channel(ch)
+{
+ // FIXME aconway 2007-08-29: When we handle session commands,
+ // do this on open.
+ session.reset(new Session(*this, 0));
}
-SessionAdapter::SessionAdapter() {
- // FIXME aconway 2007-08-27: Implement
-}
-
-void SessionAdapter::visit(const SessionOpenBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-void SessionAdapter::visit(const SessionAckBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionAttachedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
+SessionAdapter::~SessionAdapter() {}
-void SessionAdapter::visit(const SessionCloseBody&) {
- // FIXME aconway 2007-08-27: Implement
+void SessionAdapter::handle(AMQFrame& f) {
+ // FIXME aconway 2007-08-29: handle session commands here, forward
+ // other frames.
+ session->in(f);
}
-void SessionAdapter::visit(const SessionClosedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionDetachedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionFlowBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionFlowOkBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionHighWaterMarkBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionResumeBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionSolicitAckBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionSuspendBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index a190a7f2b7..237e2c8b64 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -22,43 +22,45 @@
*
*/
-#include "qpid/framing/FrameDefaultVisitor.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/broker/SuspendedSessions.h"
+#include "qpid/broker/Session.h"
+#include "qpid/framing/amqp_types.h"
namespace qpid {
namespace broker {
+class Connection;
+class Session;
+
/**
- * Session Handler: Handles frames arriving for a session.
- * Implements AMQP session class commands, forwards other traffic
- * to the next handler in the chain.
+ * A SessionAdapter is associated with each active channel. It
+ * receives incoming frames, handles session commands and manages the
+ * association between the channel and a session.
+ *
+ * SessionAdapters can be stored in a map by value.
*/
-class SessionAdapter : public framing::FrameVisitorHandler
+class SessionAdapter : public framing::FrameHandler
{
public:
- SessionAdapter();
+ SessionAdapter(Connection&, framing::ChannelId);
~SessionAdapter();
- protected:
- void visit(const framing::SessionAckBody&);
- void visit(const framing::SessionAttachedBody&);
- void visit(const framing::SessionCloseBody&);
- void visit(const framing::SessionClosedBody&);
- void visit(const framing::SessionDetachedBody&);
- void visit(const framing::SessionFlowBody&);
- void visit(const framing::SessionFlowOkBody&);
- void visit(const framing::SessionHighWaterMarkBody&);
- void visit(const framing::SessionOpenBody&);
- void visit(const framing::SessionResumeBody&);
- void visit(const framing::SessionSolicitAckBody&);
- void visit(const framing::SessionSuspendBody&);
+ /** Handle AMQP session methods, pass other frames to the session
+ * if there is one. Frames channel must be == getChannel()
+ */
+ void handle(framing::AMQFrame&);
+
+ /** Returns 0 if not attached to a session */
+ Session* getSession() const { return session.get(); }
- using FrameDefaultVisitor::visit;
+ framing::ChannelId getChannel() const { return channel; }
+ Connection& getConnection() { return connection; }
+ const Connection& getConnection() const { return connection; }
private:
- SessionState state;
- SuspendedSessions* suspended;
+ Connection& connection;
+ const framing::ChannelId channel;
+ shared_ptr<Session> session;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index d05ae1428b..e409f0f2a9 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -54,7 +54,7 @@ class ConnectionHandler : private StateManager,
{
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
-
+
void handle(framing::AMQMethodBody* method);
void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
@@ -62,6 +62,7 @@ class ConnectionHandler : private StateManager,
void fail(const std::string& message);
public:
+ using InputHandler::handle;
typedef boost::function<void()> CloseListener;
typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 86b60d896b..027679228a 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -31,28 +31,24 @@ using boost::format;
namespace qpid {
namespace framing {
-/** Framehandler that feeds into the channel. */
-struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler {
- ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {}
- void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); }
- ChannelAdapter& channel;
-};
+ChannelAdapter::Handler::Handler(ChannelAdapter& c) : parent(c) {}
+void ChannelAdapter::Handler::handle(AMQFrame& f) { parent.handleBody(f.getBody()); }
-void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
+ChannelAdapter::ChannelAdapter() : handler(*this), id(0) {}
+
+void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
{
assertChannelNotOpen();
id = i;
version = v;
-
- handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this));
- handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
+ handlers.reset(&handler, &out);
}
void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
AMQFrame frame(getId(), body);
- handlers.out->handle(frame);
+ handlers.out(frame);
}
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
@@ -73,4 +69,6 @@ void ChannelAdapter::assertChannelNotOpen() const {
504, format("Channel %d is already open.") % getId());
}
+void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); }
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 729f5e7b47..82f7115001 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -55,7 +55,7 @@ class ChannelAdapter : protected BodyHandler {
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter() : id(0) {}
+ ChannelAdapter();
virtual ~ChannelAdapter() {}
/** Initialize the channel adapter. */
@@ -69,7 +69,8 @@ class ChannelAdapter : protected BodyHandler {
virtual void send(const AMQBody& body);
virtual bool isOpen() const = 0;
-
+
+ void handle(AMQFrame& f);
protected:
void assertMethodOk(AMQMethodBody& method) const;
void assertChannelOpen() const;
@@ -78,9 +79,12 @@ class ChannelAdapter : protected BodyHandler {
virtual void handleMethod(AMQMethodBody*) = 0;
private:
- class ChannelAdapterHandler;
- friend class ChannelAdapterHandler;
-
+ struct Handler : public FrameHandler {
+ Handler(ChannelAdapter&);
+ void handle(AMQFrame&);
+ ChannelAdapter& parent;
+ };
+ Handler handler;
ChannelId id;
ProtocolVersion version;
FrameHandler::Chains handlers;
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 2f09911325..be49570f9b 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -27,33 +27,90 @@
namespace qpid {
namespace framing {
-/** Interface for handler for values of type T.
- * Handlers can be linked into chains via the next pointer.
- */
-template <class T> struct Handler {
- typedef T ParamType;
- typedef shared_ptr<Handler> Chain;
+/** Generic handler that can be linked into chains. */
+template <class T>
+struct Handler {
+ typedef T HandledType;
+
+ Handler(Handler<T>* next_=0) : next(next_) {}
+ virtual ~Handler() {}
+ virtual void handle(T) = 0;
+
+ /** Allow functor syntax for calling handle */
+ void operator()(T t) { handle(t); }
+
- /** Handler chains for incoming and outgoing traffic. */
+ /** Pointer to next handler in a linked list. */
+ Handler<T>* next;
+
+ /** A Chain is a handler that forwards to a modifiable
+ * linked list of handlers.
+ */
+ struct Chain : public Handler<T> {
+ Chain(Handler<T>* first) : Handler(first) {}
+ void operator=(Handler<T>* h) { next = h; }
+ void handle(T t) { (*next)(t); }
+ // TODO aconway 2007-08-29: chain modifier ops here.
+ };
+
+ /** In/out pair of handler chains. */
struct Chains {
- Chains() {}
- Chains(Chain i, Chain o) : in(i), out(o) {}
- Chains(Handler* i, Handler* o) : in(i), out(o) {}
+ Chains(Handler<T>* in_=0, Handler<T>* out_=0) : in(in_), out(out_) {}
+ void reset(Handler<T>* in_=0, Handler<T>* out_=0) { in = in_; out = out_; }
Chain in;
Chain out;
};
- Handler() {}
- Handler(Chain next_) : next(next_) {}
- virtual ~Handler() {}
+ /** Adapt any void(T) functor as a Handler.
+ * Functor<F>(f) will copy f.
+ * Functor<F&>(f) will only take a reference to x.
+ */
+ template <class F> class Functor : public Handler<T> {
+ public:
+ Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {}
+ void handle(T t) { functor(t); }
+ private:
+ F functor;
+ };
- virtual void handle(T) = 0;
+ /** Adapt a member function of X as a Handler.
+ * MemFun<X, X::f> will copy x.
+ * MemFun<X&, X::f> will only take a reference to x.
+ */
+ template <class X, void(*M)(T)>
+ class MemFun : public Handler<T> {
+ public:
+ MemFun(X x, Handler<T>* next=0) : Handler(next), object(x) {}
+ void handle(T t) { object.*M(t); }
+ private:
+ X object;
+ };
+
+ /** Support for implementing an in-out handler pair as a single class.
+ * Public interface is Handler<T>::Chains pair, but implementation
+ * overrides handleIn, handleOut functions in a single class.
+ */
+ class InOutHandler {
+ public:
+ virtual ~InOutHandler() {}
+
+ InOutHandler() :
+ in(*this, &InOutHandler::handleIn),
+ out(*this, &InOutHandler::handleOut) {}
+
+ MemFun<InOutHandler, &InOutHandler::handleIn> in;
+ MemFun<InOutHandler, &InOutHandler::handleOut> out;
+
+ protected:
+ virtual void handleIn(T) = 0;
+ virtual void handleOut(T) = 0;
+ private:
+ };
- /** Next handler. Public so chains can be modified by altering next. */
- Chain next;
};
-}}
+}}
#endif /*!QPID_FRAMING_HANDLER_H*/
+//
diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h
index 48a96803da..99e4e774e1 100644
--- a/cpp/src/qpid/framing/InputHandler.h
+++ b/cpp/src/qpid/framing/InputHandler.h
@@ -27,24 +27,12 @@
namespace qpid {
namespace framing {
-class InputHandler : private boost::noncopyable {
+// FIXME aconway 2007-08-29: Eliminate, replace with FrameHandler.
+class InputHandler : public FrameHandler {
public:
virtual ~InputHandler() {}
virtual void received(AMQFrame&) = 0;
-};
-
-/** FrameHandler that delegates to an InputHandler */
-struct InputHandlerFrameHandler : public FrameHandler {
- InputHandlerFrameHandler(InputHandler& in_) : in(in_) {}
- void handle(ParamType frame) { in.received(frame); }
- InputHandler& in;
-};
-
-/** InputHandler that delegates to a FrameHandler */
-struct FrameHandlerInputHandler : public InputHandler {
- FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
- void received(AMQFrame& frame) { handler->handle(frame); }
- FrameHandler::Chain handler;
+ void handle(AMQFrame& f) { received(f); }
};
}}
diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h
index 89917ac3df..925ff88b12 100644
--- a/cpp/src/qpid/framing/OutputHandler.h
+++ b/cpp/src/qpid/framing/OutputHandler.h
@@ -27,24 +27,12 @@
namespace qpid {
namespace framing {
-class OutputHandler : private boost::noncopyable {
+// FIXME aconway 2007-08-29: Replace with FrameHandler.
+class OutputHandler : public FrameHandler {
public:
virtual ~OutputHandler() {}
virtual void send(AMQFrame&) = 0;
-};
-
-/** OutputHandler that delegates to a FrameHandler */
-struct FrameHandlerOutputHandler : public OutputHandler {
- FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
- void received(AMQFrame& frame) { handler->handle(frame); }
- FrameHandler::Chain handler;
-};
-
-/** FrameHandler that delegates to an OutputHandler */
-struct OutputHandlerFrameHandler : public FrameHandler {
- OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {}
- void handle(ParamType frame) { out.send(frame); }
- OutputHandler& out;
+ void handle(AMQFrame& f) { send(f); }
};
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 7ff6a843a9..545eb965c4 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -19,10 +19,12 @@ CLEANFILES=
#
# Unit test programs.
#
-TESTS+=Session
-check_PROGRAMS+=Session
-Session_SOURCES=Session.cpp
-Session_LDADD=-lboost_unit_test_framework $(lib_broker)
+
+# FIXME aconway 2007-08-29: enable when session is reinstated.
+# TESTS+=Session
+# check_PROGRAMS+=Session
+# Session_SOURCES=Session.cpp
+# Session_LDADD=-lboost_unit_test_framework $(lib_broker)
TESTS+=Blob
check_PROGRAMS+=Blob