diff options
author | Alan Conway <aconway@apache.org> | 2008-03-18 21:31:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-03-18 21:31:08 +0000 |
commit | 36e23bcefbf0a6893370cb041bd05a662f0b2758 (patch) | |
tree | 601d29d88e873ac4d58da3cdb2753f02b64998bc /cpp/src | |
parent | eac0911169b24e708637572fe6b5a8283b3f49e0 (diff) | |
download | qpid-python-36e23bcefbf0a6893370cb041bd05a662f0b2758.tar.gz |
Make AsyncIOAcceptor multi-protocol:
- ConnectionCodec interface replaces ConnectionInputHandle, moves encoding/decoding out of AsyncIOAcceptor.
- ConnectionCodec::Factory replaces ConnectionInputHandlerFactory
- Acceptor creates version-specific ConnectionCodec based on protocol header.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@638590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
28 files changed, 517 insertions, 267 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 0513414f3f..afe98b4b0f 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -131,7 +131,6 @@ libqpidcommon_la_SOURCES = \ qpid/framing/FieldValue.cpp \ qpid/framing/FramingContent.cpp \ qpid/framing/FrameSet.cpp \ - qpid/framing/InitiationHandler.cpp \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ qpid/framing/SessionState.cpp \ @@ -168,6 +167,8 @@ libqpidcommon_la_SOURCES = \ libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams libqpidbroker_la_SOURCES = \ $(mgen_broker_cpp) \ + qpid/amqp_0_10/Connection.h \ + qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/SessionAdapter.cpp \ @@ -177,6 +178,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PersistableMessage.cpp \ qpid/broker/Bridge.cpp \ qpid/broker/PreviewConnection.cpp \ + qpid/broker/PreviewConnectionCodec.cpp \ qpid/broker/PreviewConnectionHandler.cpp \ qpid/broker/PreviewSessionHandler.cpp \ qpid/broker/PreviewSessionManager.cpp \ @@ -286,6 +288,7 @@ nobase_include_HEADERS = \ qpid/broker/BrokerSingleton.h \ qpid/broker/Bridge.h \ qpid/broker/PreviewConnection.h \ + qpid/broker/PreviewConnectionCodec.h \ qpid/broker/PreviewConnectionHandler.h \ qpid/broker/PreviewSessionHandler.h \ qpid/broker/PreviewSessionManager.h \ @@ -409,7 +412,6 @@ nobase_include_HEADERS = \ qpid/framing/FramingContent.h \ qpid/framing/Handler.h \ qpid/framing/HeaderProperties.h \ - qpid/framing/InitiationHandler.h \ qpid/framing/Invoker.h \ qpid/framing/InputHandler.h \ qpid/framing/MethodContent.h \ @@ -459,6 +461,7 @@ nobase_include_HEADERS = \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ qpid/sys/OutputControl.h \ + qpid/sys/ConnectionCodec.h \ qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ qpid/sys/Runnable.h \ diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp new file mode 100644 index 0000000000..08b47c5611 --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Connection.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp_0_10 { + +using sys::Mutex; + +Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id) + : frameQueueClosed(false), output(o), connection(this, broker, id), + identifier(id), initialized(false) {} + +size_t Connection::decode(const char* buffer, size_t size) { + framing::Buffer in(const_cast<char*>(buffer), size); + framing::AMQFrame frame; + while(frame.decode(in)) { + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + connection.received(frame); + } + return in.getPosition(); +} + +bool Connection::canEncode() { + if (!frameQueueClosed) connection.doOutput(); + Mutex::ScopedLock l(frameQueueLock); + return !initialized || !frameQueue.empty(); +} + +bool Connection::isClosed() const { + Mutex::ScopedLock l(frameQueueLock); + return frameQueueClosed; +} + +size_t Connection::encode(const char* buffer, size_t size) { + Mutex::ScopedLock l(frameQueueLock); + framing::Buffer out(const_cast<char*>(buffer), size); + if (!initialized) { + framing::ProtocolInitiation pi(getVersion()); + pi.encode(out); + initialized = true; + } + while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + frameQueue.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); + frameQueue.pop(); + } + if (!frameQueue.empty() && frameQueue.front().size() > size) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + return out.getPosition(); +} + +void Connection::activateOutput() { output.activateOutput(); } + +void Connection::close() { + // Close the output queue. + Mutex::ScopedLock l(frameQueueLock); + frameQueueClosed = true; +} + +void Connection::closed() { + connection.closed(); +} + +void Connection::send(framing::AMQFrame& f) { + { + Mutex::ScopedLock l(frameQueueLock); + if (!frameQueueClosed) + frameQueue.push(f); + } + activateOutput(); +} + +framing::ProtocolVersion Connection::getVersion() const { + return framing::ProtocolVersion(0,10); +} + +}} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h new file mode 100644 index 0000000000..e4672be722 --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -0,0 +1,62 @@ +#ifndef QPID_BROKER_CONNECTION_H +#define QPID_BROKER_CONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/Mutex.h" +#include "Connection.h" +#include "qpid/broker/Connection.h" +#include <queue> + +namespace qpid { +namespace broker { class Broker; } +namespace amqp_0_10 { + +// FIXME aconway 2008-03-18: Update to 0-10. +class Connection : public sys::ConnectionCodec, + public sys::ConnectionOutputHandler +{ + std::queue<framing::AMQFrame> frameQueue; + bool frameQueueClosed; + mutable sys::Mutex frameQueueLock; + sys::OutputControl& output; + broker::Connection connection; // FIXME aconway 2008-03-18: + std::string identifier; + bool initialized; + + public: + Connection(sys::OutputControl&, broker::Broker&, const std::string& id); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool isClosed() const; + bool canEncode(); + void activateOutput(); + void closed(); // connection closed by peer. + void close(); // closing from this end. + void send(framing::AMQFrame&); + framing::ProtocolVersion getVersion() const; +}; + +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_BROKER_CONNECTION_H*/ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 8b70831cf7..ddd5959343 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -286,19 +286,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } -sys::ConnectionInputHandler* Broker::connect( +void Broker::connect( const std::string& host, uint16_t port, - sys::ConnectionInputHandlerFactory* f) + sys::ConnectionCodec::Factory* f) { - return getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(host, port, f ? f : &factory); } -sys::ConnectionInputHandler* Broker::connect( - const Url& url, sys::ConnectionInputHandlerFactory* f) +void Broker::connect( + const Url& url, sys::ConnectionCodec::Factory* f) { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - return connect(addr.host, addr.port, f); + connect(addr.host, addr.port, f); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9e5191825d..481191eb55 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -119,12 +119,10 @@ class Broker : public sys::Runnable, public Plugin::Target, ManagementMethod (uint32_t methodId, management::Args& args); /** Create a connection to another broker. */ - sys::ConnectionInputHandler* - connect(const std::string& host, uint16_t port, - sys::ConnectionInputHandlerFactory* =0); + void connect(const std::string& host, uint16_t port, + sys::ConnectionCodec::Factory* =0); /** Create a connection to another broker. */ - sys::ConnectionInputHandler* - connect(const Url& url, sys::ConnectionInputHandlerFactory* =0); + void connect(const Url& url, sys::ConnectionCodec::Factory* =0); private: sys::Acceptor& getAcceptor() const; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 8be4f7756e..1e55087390 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -90,7 +90,9 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std adapter(*this), mgmtClosing(0), mgmtId(mgmtId_) -{} +{ + initMgmt(); +} void Connection::initMgmt(bool asLink) { @@ -134,12 +136,6 @@ void Connection::close( getOutput().close(); } -void Connection::initiated(const framing::ProtocolInitiation& header) { - version = ProtocolVersion(header.getMajor(), header.getMinor()); - adapter.init(header); - initMgmt(); -} - void Connection::idleOut(){} void Connection::idleIn(){} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 8719a9dfcd..a59df26c84 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -65,12 +65,10 @@ class Connection : public sys::ConnectionInputHandler, // ConnectionInputHandler methods void received(framing::AMQFrame& frame); - void initiated(const framing::ProtocolInitiation& header); void idleOut(); void idleIn(); void closed(); bool doOutput(); - framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } void closeChannel(framing::ChannelId channel); diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index a0cd4e35d7..dfab998c78 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -19,27 +19,32 @@ * */ #include "ConnectionFactory.h" -#include "Connection.h" -#include "MultiVersionConnectionInputHandler.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/amqp_0_10/Connection.h" +#include "PreviewConnectionCodec.h" namespace qpid { namespace broker { +using framing::ProtocolVersion; -ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) -{} +ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {} +ConnectionFactory::~ConnectionFactory() {} -ConnectionFactory::~ConnectionFactory() -{ - +sys::ConnectionCodec* +ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { + if (v == ProtocolVersion(99, 0)) + return new PreviewConnectionCodec(out, broker, id); + if (v == ProtocolVersion(0, 10)) + return new amqp_0_10::Connection(out, broker, id); + return 0; } -qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, - const std::string& id) -{ - return new MultiVersionConnectionInputHandler(out, broker, id); +sys::ConnectionCodec* +ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { + // FIXME aconway 2008-03-18: + return new PreviewConnectionCodec(out, broker, id); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index 53fb160279..5797495054 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -21,22 +21,24 @@ #ifndef _ConnectionFactory_ #define _ConnectionFactory_ -#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/ConnectionCodec.h" namespace qpid { namespace broker { class Broker; -class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory -{ +class ConnectionFactory : public sys::ConnectionCodec::Factory { public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* - create(qpid::sys::ConnectionOutputHandler* out, const std::string& id); - virtual ~ConnectionFactory(); + sys::ConnectionCodec* + create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); + + sys::ConnectionCodec* + create(sys::OutputControl&, const std::string& id); + private: Broker& broker; }; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 0aee420022..53a403c955 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -38,17 +38,6 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; } -void ConnectionHandler::init(const framing::ProtocolInitiation& header) { - //need to send out a protocol header back to the client - handler->connection.getOutput().initiated(header); - - FieldTable properties; - string mechanisms(PLAIN); - string locales(en_US); - handler->serverMode = true; - handler->client.start(properties, mechanisms, locales); -} - void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) { handler->client.close(code, text, classId, methodId); @@ -75,7 +64,15 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } } -ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {} +ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) { + FieldTable properties; + string mechanisms(PLAIN); + string locales(en_US); + handler->serverMode = true; + handler->client.start(properties, mechanisms, locales); +} + + ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()), connection(c), serverMode(false) {} diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index 44e2ce05fa..8e659f0913 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -38,7 +38,6 @@ namespace broker { class Connection; -// TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, @@ -82,7 +81,6 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; public: ConnectionHandler(Connection& connection); - void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); }; diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp index 6c3d960d1f..f1bbf7d10e 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp @@ -31,19 +31,6 @@ MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler( Broker& _broker, const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {} - -void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i) -{ - if (i.getMajor() == 99 && i.getMinor() == 0) { - handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id)); - } else if (i.getMajor() == 0 && i.getMinor() == 10) { - handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id)); - } else { - throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString()); - } - handler->initiated(i); -} - void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f) { check(); @@ -67,11 +54,6 @@ bool MultiVersionConnectionInputHandler::doOutput() return handler.get() && handler->doOutput(); } -qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation() -{ - return qpid::framing::ProtocolInitiation(linkVersion); -} - void MultiVersionConnectionInputHandler::closed() { if (handler.get()) handler->closed(); diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h index 440c00c09a..e6915a00bd 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h @@ -44,12 +44,10 @@ public: MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id); virtual ~MultiVersionConnectionInputHandler() {} - void initiated(const qpid::framing::ProtocolInitiation&); void received(qpid::framing::AMQFrame&); void idleOut(); void idleIn(); bool doOutput(); - qpid::framing::ProtocolInitiation getInitiation(); void closed(); }; diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp index 05879a0329..5a541b5624 100644 --- a/cpp/src/qpid/broker/PreviewConnection.cpp +++ b/cpp/src/qpid/broker/PreviewConnection.cpp @@ -90,7 +90,9 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok adapter(*this), mgmtClosing(0), mgmtId(mgmtId_) -{} +{ + initMgmt(); +} void PreviewConnection::initMgmt(bool asLink) { @@ -134,12 +136,6 @@ void PreviewConnection::close( getOutput().close(); } -void PreviewConnection::initiated(const framing::ProtocolInitiation& header) { - version = ProtocolVersion(header.getMajor(), header.getMinor()); - adapter.init(header); - initMgmt(); -} - void PreviewConnection::idleOut(){} void PreviewConnection::idleIn(){} diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h index d6a945c26c..1cc9e7a3d4 100644 --- a/cpp/src/qpid/broker/PreviewConnection.h +++ b/cpp/src/qpid/broker/PreviewConnection.h @@ -50,8 +50,7 @@ namespace qpid { namespace broker { -class PreviewConnection : public sys::ConnectionInputHandler, - public ConnectionState +class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState { public: PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId); @@ -65,12 +64,10 @@ class PreviewConnection : public sys::ConnectionInputHandler, // ConnectionInputHandler methods void received(framing::AMQFrame& frame); - void initiated(const framing::ProtocolInitiation& header); void idleOut(); void idleIn(); void closed(); bool doOutput(); - framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } void closeChannel(framing::ChannelId channel); diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp new file mode 100644 index 0000000000..81ec7f7076 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewConnectionCodec.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +using sys::Mutex; + +PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id) + : frameQueueClosed(false), output(o), connection(this, broker, id), identifier(id) {} + +size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) { + framing::Buffer in(const_cast<char*>(buffer), size); + framing::AMQFrame frame; + while(frame.decode(in)) { + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + connection.received(frame); + } + return in.getPosition(); +} + +bool PreviewConnectionCodec::canEncode() { + if (!frameQueueClosed) connection.doOutput(); + return !frameQueue.empty(); +} + +bool PreviewConnectionCodec::isClosed() const { + Mutex::ScopedLock l(frameQueueLock); + return frameQueueClosed; +} + +size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) { + Mutex::ScopedLock l(frameQueueLock); + framing::Buffer out(const_cast<char*>(buffer), size); + while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + frameQueue.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); + frameQueue.pop(); + } + if (!frameQueue.empty() && frameQueue.front().size() > size) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + return out.getPosition(); +} + +void PreviewConnectionCodec::activateOutput() { output.activateOutput(); } + +void PreviewConnectionCodec::close() { + // Close the output queue. + Mutex::ScopedLock l(frameQueueLock); + frameQueueClosed = true; +} + +void PreviewConnectionCodec::closed() { + connection.closed(); +} + +void PreviewConnectionCodec::send(framing::AMQFrame& f) { + { + Mutex::ScopedLock l(frameQueueLock); + if (!frameQueueClosed) + frameQueue.push(f); + } + activateOutput(); +} + +framing::ProtocolVersion PreviewConnectionCodec::getVersion() const { + return framing::ProtocolVersion(99,0); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h new file mode 100644 index 0000000000..8c7074c1df --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionCodec.h @@ -0,0 +1,55 @@ +#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H +#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/Mutex.h" +#include "PreviewConnection.h" + +namespace qpid { +namespace broker { + +class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { + std::queue<framing::AMQFrame> frameQueue; + bool frameQueueClosed; + mutable sys::Mutex frameQueueLock; + sys::OutputControl& output; + PreviewConnection connection; + std::string identifier; + + public: + PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool isClosed() const; + bool canEncode(); + void activateOutput(); + void closed(); // connection closed by peer. + void close(); // closing from this end. + void send(framing::AMQFrame&); + framing::ProtocolVersion getVersion() const; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/ diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp index c0f0d9f5e0..0052b0d588 100644 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp @@ -37,14 +37,6 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; } -void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) { - FieldTable properties; - string mechanisms(PLAIN); - string locales(en_US); - handler->serverMode = true; - handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); -} - void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) { handler->client.close(code, text, classId, methodId); @@ -68,7 +60,13 @@ void PreviewConnectionHandler::handle(framing::AMQFrame& frame) } } -PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {} +PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) { + FieldTable properties; + string mechanisms(PLAIN); + string locales(en_US); + handler->serverMode = true; + handler->client.start(0, 10, properties, mechanisms, locales); +} PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()), connection(c), serverMode(false) {} diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h index 93901dd492..bd6b54e8f7 100644 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.h +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h @@ -81,7 +81,6 @@ class PreviewConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; public: PreviewConnectionHandler(PreviewConnection& connection); - void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); }; diff --git a/cpp/src/qpid/framing/ProtocolInitiation.cpp b/cpp/src/qpid/framing/ProtocolInitiation.cpp index 7164bceb12..50617de017 100644 --- a/cpp/src/qpid/framing/ProtocolInitiation.cpp +++ b/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -58,6 +58,9 @@ bool ProtocolInitiation::decode(Buffer& buffer){ } } -//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date + +std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi) { + return o << int(pi.getMajor()) << "-" << int(pi.getMinor()); +} }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ProtocolInitiation.h b/cpp/src/qpid/framing/ProtocolInitiation.h index 31c73eb124..43e32da4cf 100644 --- a/cpp/src/qpid/framing/ProtocolInitiation.h +++ b/cpp/src/qpid/framing/ProtocolInitiation.h @@ -45,8 +45,12 @@ public: inline uint8_t getMajor() const { return version.getMajor(); } inline uint8_t getMinor() const { return version.getMinor(); } inline ProtocolVersion getVersion() const { return version; } + bool operator==(ProtocolVersion v) const { return v == getVersion(); } }; +std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi); + + } } diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index 9ce148a37b..4e4747c3f4 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -28,6 +28,5 @@ #include "AMQHeartbeatBody.h" #include "InputHandler.h" #include "OutputHandler.h" -#include "InitiationHandler.h" #include "ProtocolInitiation.h" #include "ProtocolVersion.h" diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 5eb1f1a500..1e7827e60c 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -24,13 +24,11 @@ #include <stdint.h> #include "qpid/SharedObject.h" +#include "ConnectionCodec.h" namespace qpid { namespace sys { -class ConnectionInputHandlerFactory; -class ConnectionInputHandler; - class Acceptor : public qpid::SharedObject<Acceptor> { public: @@ -38,10 +36,9 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionInputHandlerFactory* factory) = 0; - virtual ConnectionInputHandler* connect( - const std::string& host, int16_t port, - ConnectionInputHandlerFactory* factory) = 0; + virtual void run(ConnectionCodec::Factory*) = 0; + virtual void connect( + const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; /** Note: this function is async-signal safe */ virtual void shutdown() = 0; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index c24205f53e..56d7c6e1f3 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -27,12 +27,8 @@ #include "Thread.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/AMQDataBlock.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -40,6 +36,7 @@ #include <queue> #include <vector> #include <memory> +#include <ostream> namespace qpid { namespace sys { @@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor { public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} - void run(ConnectionInputHandlerFactory* factory); - ConnectionInputHandler* connect( - const std::string& host, int16_t port, - ConnectionInputHandlerFactory* factory); + void run(ConnectionCodec::Factory*); + void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); void shutdown(); @@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor { std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); + void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) { - return - Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : @@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { +class AsynchIOHandler : public OutputControl { AsynchIO* aio; - ConnectionInputHandler* inputHandler; - std::queue<framing::AMQFrame> frameQueue; - Mutex frameQueueLock; - bool frameQueueClosed; - bool isInitiated; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; bool readError; std::string identifier; bool isClient; - void write(const framing::AMQDataBlock&); + void write(const framing::ProtocolInitiation&); public: AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - isInitiated(false), + aio(0), + factory(0), + codec(0), readError(false), isClient(false) {} ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; + if (codec) + codec->closed(); + delete codec; } void setClient() { isClient = true; } - - void init(AsynchIO* a, ConnectionInputHandler* h) { + + void init(AsynchIO* a, ConnectionCodec::Factory* f) { aio = a; - inputHandler = h; + factory = f; identifier = aio->getSocket().getPeerAddress(); + } // Output side - void send(framing::AMQFrame&); void close(); void activateOutput(); - void initiated(const framing::ProtocolInitiation&); - // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void closedSocket(AsynchIO& aio, const Socket& s); }; -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s.getPeerAddress()); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); @@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { +void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { Dispatcher d(poller); AsynchAcceptor acceptor(listener, @@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { } } -ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +void AsynchIOAcceptor::connect( + const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); AsynchIOHandler* async = new AsynchIOHandler; async->setClient(); - ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress()); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); } aio->start(poller); - return handler; } @@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() { } -void AsynchIOHandler::write(const framing::AMQDataBlock& data) +void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { + QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")"); AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); if (!buff) buff = new Buff; @@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data) aio->queueWrite(buff); } -// Output side -void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { - ScopedLock<Mutex> l(frameQueueLock); - // Ignore anything seen after closing - if (!frameQueueClosed) - frameQueue.push(frame); - } - - // Activate aio for writing here - aio->notifyPendingWrite(); -} - -void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; -} - void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi) -{ - write(pi); -} - // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { return; } - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - if(isInitiated){ - framing::AMQFrame frame; - try{ - while(frame.decode(in)) { - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - inputHandler->received(frame); - } + size_t decoded = 0; + if (codec) { // Already initiated + try { + decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; aio->queueWriteClose(); } }else{ + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); framing::ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - QPID_LOG(debug, "INIT [" << identifier << "]"); - inputHandler->initiated(protocolInit); - isInitiated = true; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")"); + codec = factory->create(protocolInit.getVersion(), *this, identifier); + if (!codec) { + // FIXME aconway 2008-03-18: send valid version header & close connection. + // FIXME aconway 2008-03-18: exception type + throw Exception( + QPID_MSG("Protocol version not supported: " << protocolInit)); + } } } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { + if (decoded != size_t(buff->dataCount)) { // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); + buff->dataStart += decoded; + buff->dataCount -= decoded; aio->unread(buff); } else { // Give whole buffer back to aio subsystem @@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { void AsynchIOHandler::eof(AsynchIO&) { QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); + if (codec) codec->closed(); aio->queueWriteClose(); } @@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && !isInitiated) { - //get & write protocol header from upper layers - write(inputHandler->getInitiation()); - isInitiated = true; + if (isClient && codec == 0) { + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); return; } - ScopedLock<Mutex> l(frameQueueLock); - - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } - - do { + if (codec == 0) return; + while (codec->canEncode()) { // Try and get a queued buffer if not then construct new one AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; - - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - int framesEncoded=0; - while (frameSize <= int(out.available())) { - frameQueue.pop(); - - // Encode output frame - frame.encode(out); - ++framesEncoded; - buffUsed += frameSize; - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - - if (frameQueue.empty()) { - //if we have run out of frames, allow upper layers to - //generate more - if (!frameQueueClosed) { - inputHandler->doOutput(); - } - if (frameQueue.empty()) { - //if there are still no frames, we have no more to - //do - break; - } - } - frame = frameQueue.front(); - frameSize = frame.size(); - } - QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); - - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - - buff->dataCount = buffUsed; + if (!buff) buff = new Buff; + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; aio->queueWrite(buff); - } while (!frameQueue.empty()); - - if (frameQueueClosed) { - aio->queueWriteClose(); } + if (codec->isClosed()) + aio->queueWriteClose(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h new file mode 100644 index 0000000000..205596c709 --- /dev/null +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -0,0 +1,80 @@ +#ifndef QPID_SYS_CONNECTION_CODEC_H +#define QPID_SYS_CONNECTION_CODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolVersion.h" +#include "OutputControl.h" +#include <memory> +#include <map> + +namespace qpid { + +namespace broker { class Broker; } + +namespace sys { + +/** + * Interface of coder/decoder for a connection of a specific protocol + * version. + */ +class ConnectionCodec { + public: + virtual ~ConnectionCodec() {} + + /** Decode from buffer, return number of bytes decoded. + * @return may be less than size if there was incomplete + * data at the end of the buffer. + */ + virtual size_t decode(const char* buffer, size_t size) = 0; + + + /** Encode into buffer, return number of bytes encoded */ + virtual size_t encode(const char* buffer, size_t size) = 0; + + /** Return true if we have data to encode */ + virtual bool canEncode() = 0; + + /** Network connection was closed from other end. */ + virtual void closed() = 0; + + virtual bool isClosed() const = 0; + + virtual framing::ProtocolVersion getVersion() const = 0; + + struct Factory { + virtual ~Factory() {} + + /** Return 0 if version unknown */ + virtual ConnectionCodec* create( + framing::ProtocolVersion, OutputControl&, const std::string& id + ) = 0; + + /** Return "preferred" codec for outbound connections. */ + virtual ConnectionCodec* create( + OutputControl&, const std::string& id + ) = 0; + }; +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CONNECTION_CODEC_H*/ diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h index 1936b5ec50..a2c18d6d9a 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -22,8 +22,6 @@ #define _ConnectionInputHandler_ #include "qpid/framing/InputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" #include "OutputTask.h" #include "TimeoutHandler.h" @@ -31,12 +29,10 @@ namespace qpid { namespace sys { class ConnectionInputHandler : - public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, public TimeoutHandler, public OutputTask { public: - virtual qpid::framing::ProtocolInitiation getInitiation() = 0; virtual void closed() = 0; }; diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h index 13407d9b9d..5a60ae4998 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,7 +22,6 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" #include "OutputControl.h" namespace qpid { @@ -31,7 +30,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl { public: virtual void close() = 0; diff --git a/cpp/src/tests/MockConnectionInputHandler.h b/cpp/src/tests/MockConnectionInputHandler.h index d104e7d934..89b6155355 100644 --- a/cpp/src/tests/MockConnectionInputHandler.h +++ b/cpp/src/tests/MockConnectionInputHandler.h @@ -22,7 +22,6 @@ #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/Monitor.h" -#include "qpid/framing/ProtocolInitiation.h" struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { @@ -30,23 +29,12 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { ~MockConnectionInputHandler() {} - void initiated(const qpid::framing::ProtocolInitiation& pi) { - qpid::sys::Monitor::ScopedLock l(monitor); - init = pi; - setState(GOT_INIT); - } - void received(qpid::framing::AMQFrame* framep) { qpid::sys::Monitor::ScopedLock l(monitor); frame = *framep; setState(GOT_FRAME); } - qpid::framing::ProtocolInitiation waitForProtocolInit() { - waitFor(GOT_INIT); - return init; - } - qpid::framing::AMQFrame waitForFrame() { waitFor(GOT_FRAME); return frame; @@ -65,7 +53,7 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { void idleIn() {} private: - typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State; + typedef enum { START, GOT_FRAME, CLOSED } State; void setState(State s) { state = s; @@ -81,7 +69,6 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { qpid::sys::Monitor monitor; State state; - qpid::framing::ProtocolInitiation init; qpid::framing::AMQFrame frame; }; |