summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-03-18 21:31:08 +0000
committerAlan Conway <aconway@apache.org>2008-03-18 21:31:08 +0000
commit36e23bcefbf0a6893370cb041bd05a662f0b2758 (patch)
tree601d29d88e873ac4d58da3cdb2753f02b64998bc /cpp
parenteac0911169b24e708637572fe6b5a8283b3f49e0 (diff)
downloadqpid-python-36e23bcefbf0a6893370cb041bd05a662f0b2758.tar.gz
Make AsyncIOAcceptor multi-protocol:
- ConnectionCodec interface replaces ConnectionInputHandle, moves encoding/decoding out of AsyncIOAcceptor. - ConnectionCodec::Factory replaces ConnectionInputHandlerFactory - Acceptor creates version-specific ConnectionCodec based on protocol header. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@638590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp97
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h62
-rw-r--r--cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp10
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp29
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h14
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp21
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h2
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp18
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h2
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp10
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.h5
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.cpp90
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.h55
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.cpp16
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.h1
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.cpp5
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.h4
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/sys/Acceptor.h11
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp200
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h80
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandler.h4
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandler.h3
-rw-r--r--cpp/src/tests/MockConnectionInputHandler.h15
28 files changed, 517 insertions, 267 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 0513414f3f..afe98b4b0f 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -131,7 +131,6 @@ libqpidcommon_la_SOURCES = \
qpid/framing/FieldValue.cpp \
qpid/framing/FramingContent.cpp \
qpid/framing/FrameSet.cpp \
- qpid/framing/InitiationHandler.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
qpid/framing/SessionState.cpp \
@@ -168,6 +167,8 @@ libqpidcommon_la_SOURCES = \
libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams
libqpidbroker_la_SOURCES = \
$(mgen_broker_cpp) \
+ qpid/amqp_0_10/Connection.h \
+ qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/SessionAdapter.cpp \
@@ -177,6 +178,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
qpid/broker/PreviewConnection.cpp \
+ qpid/broker/PreviewConnectionCodec.cpp \
qpid/broker/PreviewConnectionHandler.cpp \
qpid/broker/PreviewSessionHandler.cpp \
qpid/broker/PreviewSessionManager.cpp \
@@ -286,6 +288,7 @@ nobase_include_HEADERS = \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
qpid/broker/PreviewConnection.h \
+ qpid/broker/PreviewConnectionCodec.h \
qpid/broker/PreviewConnectionHandler.h \
qpid/broker/PreviewSessionHandler.h \
qpid/broker/PreviewSessionManager.h \
@@ -409,7 +412,6 @@ nobase_include_HEADERS = \
qpid/framing/FramingContent.h \
qpid/framing/Handler.h \
qpid/framing/HeaderProperties.h \
- qpid/framing/InitiationHandler.h \
qpid/framing/Invoker.h \
qpid/framing/InputHandler.h \
qpid/framing/MethodContent.h \
@@ -459,6 +461,7 @@ nobase_include_HEADERS = \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
qpid/sys/OutputControl.h \
+ qpid/sys/ConnectionCodec.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
qpid/sys/Runnable.h \
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
new file mode 100644
index 0000000000..08b47c5611
--- /dev/null
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace amqp_0_10 {
+
+using sys::Mutex;
+
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id)
+ : frameQueueClosed(false), output(o), connection(this, broker, id),
+ identifier(id), initialized(false) {}
+
+size_t Connection::decode(const char* buffer, size_t size) {
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ framing::AMQFrame frame;
+ while(frame.decode(in)) {
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ connection.received(frame);
+ }
+ return in.getPosition();
+}
+
+bool Connection::canEncode() {
+ if (!frameQueueClosed) connection.doOutput();
+ Mutex::ScopedLock l(frameQueueLock);
+ return !initialized || !frameQueue.empty();
+}
+
+bool Connection::isClosed() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return frameQueueClosed;
+}
+
+size_t Connection::encode(const char* buffer, size_t size) {
+ Mutex::ScopedLock l(frameQueueLock);
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ if (!initialized) {
+ framing::ProtocolInitiation pi(getVersion());
+ pi.encode(out);
+ initialized = true;
+ }
+ while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ frameQueue.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
+ frameQueue.pop();
+ }
+ if (!frameQueue.empty() && frameQueue.front().size() > size)
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
+ return out.getPosition();
+}
+
+void Connection::activateOutput() { output.activateOutput(); }
+
+void Connection::close() {
+ // Close the output queue.
+ Mutex::ScopedLock l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+void Connection::closed() {
+ connection.closed();
+}
+
+void Connection::send(framing::AMQFrame& f) {
+ {
+ Mutex::ScopedLock l(frameQueueLock);
+ if (!frameQueueClosed)
+ frameQueue.push(f);
+ }
+ activateOutput();
+}
+
+framing::ProtocolVersion Connection::getVersion() const {
+ return framing::ProtocolVersion(0,10);
+}
+
+}} // namespace qpid::amqp_0_10
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
new file mode 100644
index 0000000000..e4672be722
--- /dev/null
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -0,0 +1,62 @@
+#ifndef QPID_BROKER_CONNECTION_H
+#define QPID_BROKER_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "Connection.h"
+#include "qpid/broker/Connection.h"
+#include <queue>
+
+namespace qpid {
+namespace broker { class Broker; }
+namespace amqp_0_10 {
+
+// FIXME aconway 2008-03-18: Update to 0-10.
+class Connection : public sys::ConnectionCodec,
+ public sys::ConnectionOutputHandler
+{
+ std::queue<framing::AMQFrame> frameQueue;
+ bool frameQueueClosed;
+ mutable sys::Mutex frameQueueLock;
+ sys::OutputControl& output;
+ broker::Connection connection; // FIXME aconway 2008-03-18:
+ std::string identifier;
+ bool initialized;
+
+ public:
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool isClosed() const;
+ bool canEncode();
+ void activateOutput();
+ void closed(); // connection closed by peer.
+ void close(); // closing from this end.
+ void send(framing::AMQFrame&);
+ framing::ProtocolVersion getVersion() const;
+};
+
+}} // namespace qpid::amqp_0_10
+
+#endif /*!QPID_BROKER_CONNECTION_H*/
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 8b70831cf7..ddd5959343 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -286,19 +286,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
-sys::ConnectionInputHandler* Broker::connect(
+void Broker::connect(
const std::string& host, uint16_t port,
- sys::ConnectionInputHandlerFactory* f)
+ sys::ConnectionCodec::Factory* f)
{
- return getAcceptor().connect(host, port, f ? f : &factory);
+ getAcceptor().connect(host, port, f ? f : &factory);
}
-sys::ConnectionInputHandler* Broker::connect(
- const Url& url, sys::ConnectionInputHandlerFactory* f)
+void Broker::connect(
+ const Url& url, sys::ConnectionCodec::Factory* f)
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- return connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, f);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9e5191825d..481191eb55 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -119,12 +119,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
ManagementMethod (uint32_t methodId, management::Args& args);
/** Create a connection to another broker. */
- sys::ConnectionInputHandler*
- connect(const std::string& host, uint16_t port,
- sys::ConnectionInputHandlerFactory* =0);
+ void connect(const std::string& host, uint16_t port,
+ sys::ConnectionCodec::Factory* =0);
/** Create a connection to another broker. */
- sys::ConnectionInputHandler*
- connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+ void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
private:
sys::Acceptor& getAcceptor() const;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 8be4f7756e..1e55087390 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -90,7 +90,9 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
-{}
+{
+ initMgmt();
+}
void Connection::initMgmt(bool asLink)
{
@@ -134,12 +136,6 @@ void Connection::close(
getOutput().close();
}
-void Connection::initiated(const framing::ProtocolInitiation& header) {
- version = ProtocolVersion(header.getMajor(), header.getMinor());
- adapter.init(header);
- initMgmt();
-}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 8719a9dfcd..a59df26c84 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -65,12 +65,10 @@ class Connection : public sys::ConnectionInputHandler,
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
- void initiated(const framing::ProtocolInitiation& header);
void idleOut();
void idleIn();
void closed();
bool doOutput();
- framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index a0cd4e35d7..dfab998c78 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -19,27 +19,32 @@
*
*/
#include "ConnectionFactory.h"
-#include "Connection.h"
-#include "MultiVersionConnectionInputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "PreviewConnectionCodec.h"
namespace qpid {
namespace broker {
+using framing::ProtocolVersion;
-ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
-{}
+ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
+ConnectionFactory::~ConnectionFactory() {}
-ConnectionFactory::~ConnectionFactory()
-{
-
+sys::ConnectionCodec*
+ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == ProtocolVersion(99, 0))
+ return new PreviewConnectionCodec(out, broker, id);
+ if (v == ProtocolVersion(0, 10))
+ return new amqp_0_10::Connection(out, broker, id);
+ return 0;
}
-qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
- const std::string& id)
-{
- return new MultiVersionConnectionInputHandler(out, broker, id);
+sys::ConnectionCodec*
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ // FIXME aconway 2008-03-18:
+ return new PreviewConnectionCodec(out, broker, id);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index 53fb160279..5797495054 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -21,22 +21,24 @@
#ifndef _ConnectionFactory_
#define _ConnectionFactory_
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
namespace broker {
class Broker;
-class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
-{
+class ConnectionFactory : public sys::ConnectionCodec::Factory {
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler*
- create(qpid::sys::ConnectionOutputHandler* out, const std::string& id);
-
virtual ~ConnectionFactory();
+ sys::ConnectionCodec*
+ create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+
+ sys::ConnectionCodec*
+ create(sys::OutputControl&, const std::string& id);
+
private:
Broker& broker;
};
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 0aee420022..53a403c955 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -38,17 +38,6 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
}
-void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
- //need to send out a protocol header back to the client
- handler->connection.getOutput().initiated(header);
-
- FieldTable properties;
- string mechanisms(PLAIN);
- string locales(en_US);
- handler->serverMode = true;
- handler->client.start(properties, mechanisms, locales);
-}
-
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
handler->client.close(code, text, classId, methodId);
@@ -75,7 +64,15 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
}
}
-ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(properties, mechanisms, locales);
+}
+
+
ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()),
connection(c), serverMode(false) {}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index 44e2ce05fa..8e659f0913 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -38,7 +38,6 @@ namespace broker {
class Connection;
-// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
@@ -82,7 +81,6 @@ class ConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
public:
ConnectionHandler(Connection& connection);
- void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
index 6c3d960d1f..f1bbf7d10e 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
@@ -31,19 +31,6 @@ MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler(
Broker& _broker,
const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {}
-
-void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i)
-{
- if (i.getMajor() == 99 && i.getMinor() == 0) {
- handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id));
- } else if (i.getMajor() == 0 && i.getMinor() == 10) {
- handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id));
- } else {
- throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString());
- }
- handler->initiated(i);
-}
-
void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
{
check();
@@ -67,11 +54,6 @@ bool MultiVersionConnectionInputHandler::doOutput()
return handler.get() && handler->doOutput();
}
-qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
-{
- return qpid::framing::ProtocolInitiation(linkVersion);
-}
-
void MultiVersionConnectionInputHandler::closed()
{
if (handler.get()) handler->closed();
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
index 440c00c09a..e6915a00bd 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
@@ -44,12 +44,10 @@ public:
MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
virtual ~MultiVersionConnectionInputHandler() {}
- void initiated(const qpid::framing::ProtocolInitiation&);
void received(qpid::framing::AMQFrame&);
void idleOut();
void idleIn();
bool doOutput();
- qpid::framing::ProtocolInitiation getInitiation();
void closed();
};
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
index 05879a0329..5a541b5624 100644
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ b/cpp/src/qpid/broker/PreviewConnection.cpp
@@ -90,7 +90,9 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
-{}
+{
+ initMgmt();
+}
void PreviewConnection::initMgmt(bool asLink)
{
@@ -134,12 +136,6 @@ void PreviewConnection::close(
getOutput().close();
}
-void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
- version = ProtocolVersion(header.getMajor(), header.getMinor());
- adapter.init(header);
- initMgmt();
-}
-
void PreviewConnection::idleOut(){}
void PreviewConnection::idleIn(){}
diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h
index d6a945c26c..1cc9e7a3d4 100644
--- a/cpp/src/qpid/broker/PreviewConnection.h
+++ b/cpp/src/qpid/broker/PreviewConnection.h
@@ -50,8 +50,7 @@
namespace qpid {
namespace broker {
-class PreviewConnection : public sys::ConnectionInputHandler,
- public ConnectionState
+class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState
{
public:
PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
@@ -65,12 +64,10 @@ class PreviewConnection : public sys::ConnectionInputHandler,
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
- void initiated(const framing::ProtocolInitiation& header);
void idleOut();
void idleIn();
void closed();
bool doOutput();
- framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
new file mode 100644
index 0000000000..81ec7f7076
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnectionCodec.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+using sys::Mutex;
+
+PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id)
+ : frameQueueClosed(false), output(o), connection(this, broker, id), identifier(id) {}
+
+size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) {
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ framing::AMQFrame frame;
+ while(frame.decode(in)) {
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ connection.received(frame);
+ }
+ return in.getPosition();
+}
+
+bool PreviewConnectionCodec::canEncode() {
+ if (!frameQueueClosed) connection.doOutput();
+ return !frameQueue.empty();
+}
+
+bool PreviewConnectionCodec::isClosed() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return frameQueueClosed;
+}
+
+size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) {
+ Mutex::ScopedLock l(frameQueueLock);
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ frameQueue.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
+ frameQueue.pop();
+ }
+ if (!frameQueue.empty() && frameQueue.front().size() > size)
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
+ return out.getPosition();
+}
+
+void PreviewConnectionCodec::activateOutput() { output.activateOutput(); }
+
+void PreviewConnectionCodec::close() {
+ // Close the output queue.
+ Mutex::ScopedLock l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+void PreviewConnectionCodec::closed() {
+ connection.closed();
+}
+
+void PreviewConnectionCodec::send(framing::AMQFrame& f) {
+ {
+ Mutex::ScopedLock l(frameQueueLock);
+ if (!frameQueueClosed)
+ frameQueue.push(f);
+ }
+ activateOutput();
+}
+
+framing::ProtocolVersion PreviewConnectionCodec::getVersion() const {
+ return framing::ProtocolVersion(99,0);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h
new file mode 100644
index 0000000000..8c7074c1df
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionCodec.h
@@ -0,0 +1,55 @@
+#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "PreviewConnection.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler {
+ std::queue<framing::AMQFrame> frameQueue;
+ bool frameQueueClosed;
+ mutable sys::Mutex frameQueueLock;
+ sys::OutputControl& output;
+ PreviewConnection connection;
+ std::string identifier;
+
+ public:
+ PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool isClosed() const;
+ bool canEncode();
+ void activateOutput();
+ void closed(); // connection closed by peer.
+ void close(); // closing from this end.
+ void send(framing::AMQFrame&);
+ framing::ProtocolVersion getVersion() const;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
index c0f0d9f5e0..0052b0d588 100644
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
@@ -37,14 +37,6 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
}
-void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) {
- FieldTable properties;
- string mechanisms(PLAIN);
- string locales(en_US);
- handler->serverMode = true;
- handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
-}
-
void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
handler->client.close(code, text, classId, methodId);
@@ -68,7 +60,13 @@ void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
}
}
-PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {}
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(0, 10, properties, mechanisms, locales);
+}
PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()),
connection(c), serverMode(false) {}
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h
index 93901dd492..bd6b54e8f7 100644
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.h
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h
@@ -81,7 +81,6 @@ class PreviewConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
public:
PreviewConnectionHandler(PreviewConnection& connection);
- void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.cpp b/cpp/src/qpid/framing/ProtocolInitiation.cpp
index 7164bceb12..50617de017 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.cpp
+++ b/cpp/src/qpid/framing/ProtocolInitiation.cpp
@@ -58,6 +58,9 @@ bool ProtocolInitiation::decode(Buffer& buffer){
}
}
-//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date
+
+std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi) {
+ return o << int(pi.getMajor()) << "-" << int(pi.getMinor());
+}
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.h b/cpp/src/qpid/framing/ProtocolInitiation.h
index 31c73eb124..43e32da4cf 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.h
+++ b/cpp/src/qpid/framing/ProtocolInitiation.h
@@ -45,8 +45,12 @@ public:
inline uint8_t getMajor() const { return version.getMajor(); }
inline uint8_t getMinor() const { return version.getMinor(); }
inline ProtocolVersion getVersion() const { return version; }
+ bool operator==(ProtocolVersion v) const { return v == getVersion(); }
};
+std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi);
+
+
}
}
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index 9ce148a37b..4e4747c3f4 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -28,6 +28,5 @@
#include "AMQHeartbeatBody.h"
#include "InputHandler.h"
#include "OutputHandler.h"
-#include "InitiationHandler.h"
#include "ProtocolInitiation.h"
#include "ProtocolVersion.h"
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 5eb1f1a500..1e7827e60c 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -24,13 +24,11 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
+#include "ConnectionCodec.h"
namespace qpid {
namespace sys {
-class ConnectionInputHandlerFactory;
-class ConnectionInputHandler;
-
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
@@ -38,10 +36,9 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(ConnectionInputHandlerFactory* factory) = 0;
- virtual ConnectionInputHandler* connect(
- const std::string& host, int16_t port,
- ConnectionInputHandlerFactory* factory) = 0;
+ virtual void run(ConnectionCodec::Factory*) = 0;
+ virtual void connect(
+ const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0;
/** Note: this function is async-signal safe */
virtual void shutdown() = 0;
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index c24205f53e..56d7c6e1f3 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -27,12 +27,8 @@
#include "Thread.h"
#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/AMQDataBlock.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -40,6 +36,7 @@
#include <queue>
#include <vector>
#include <memory>
+#include <ostream>
namespace qpid {
namespace sys {
@@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor {
public:
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
- void run(ConnectionInputHandlerFactory* factory);
- ConnectionInputHandler* connect(
- const std::string& host, int16_t port,
- ConnectionInputHandlerFactory* factory);
+ void run(ConnectionCodec::Factory*);
+ void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
void shutdown();
@@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor {
std::string getHost() const;
private:
- void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
{
- return
- Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
}
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
@@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+class AsynchIOHandler : public OutputControl {
AsynchIO* aio;
- ConnectionInputHandler* inputHandler;
- std::queue<framing::AMQFrame> frameQueue;
- Mutex frameQueueLock;
- bool frameQueueClosed;
- bool isInitiated;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
bool readError;
std::string identifier;
bool isClient;
- void write(const framing::AMQDataBlock&);
+ void write(const framing::ProtocolInitiation&);
public:
AsynchIOHandler() :
- inputHandler(0),
- frameQueueClosed(false),
- isInitiated(false),
+ aio(0),
+ factory(0),
+ codec(0),
readError(false),
isClient(false)
{}
~AsynchIOHandler() {
- if (inputHandler)
- inputHandler->closed();
- delete inputHandler;
+ if (codec)
+ codec->closed();
+ delete codec;
}
void setClient() { isClient = true; }
-
- void init(AsynchIO* a, ConnectionInputHandler* h) {
+
+ void init(AsynchIO* a, ConnectionCodec::Factory* f) {
aio = a;
- inputHandler = h;
+ factory = f;
identifier = aio->getSocket().getPeerAddress();
+
}
// Output side
- void send(framing::AMQFrame&);
void close();
void activateOutput();
- void initiated(const framing::ProtocolInitiation&);
-
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
void closedSocket(AsynchIO& aio, const Socket& s);
};
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
-
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s.getPeerAddress());
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
@@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
Dispatcher d(poller);
AsynchAcceptor
acceptor(listener,
@@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
}
}
-ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+void AsynchIOAcceptor::connect(
+ const std::string& host, int16_t port, ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
AsynchIOHandler* async = new AsynchIOHandler;
async->setClient();
- ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress());
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
- return handler;
}
@@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() {
}
-void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
+ QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (!buff)
buff = new Buff;
@@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data)
aio->queueWrite(buff);
}
-// Output side
-void AsynchIOHandler::send(framing::AMQFrame& frame) {
- // TODO: Need to find out if we are in the callback context,
- // in the callback thread if so we can go further than just queuing the frame
- // to be handled later
- {
- ScopedLock<Mutex> l(frameQueueLock);
- // Ignore anything seen after closing
- if (!frameQueueClosed)
- frameQueue.push(frame);
- }
-
- // Activate aio for writing here
- aio->notifyPendingWrite();
-}
-
-void AsynchIOHandler::close() {
- ScopedLock<Mutex> l(frameQueueLock);
- frameQueueClosed = true;
-}
-
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
-{
- write(pi);
-}
-
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- if(isInitiated){
- framing::AMQFrame frame;
- try{
- while(frame.decode(in)) {
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- inputHandler->received(frame);
- }
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
aio->queueWriteClose();
}
}else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- QPID_LOG(debug, "INIT [" << identifier << "]");
- inputHandler->initiated(protocolInit);
- isInitiated = true;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")");
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+ if (!codec) {
+ // FIXME aconway 2008-03-18: send valid version header & close connection.
+ // FIXME aconway 2008-03-18: exception type
+ throw Exception(
+ QPID_MSG("Protocol version not supported: " << protocolInit));
+ }
}
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
+ if (decoded != size_t(buff->dataCount)) {
// Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
aio->unread(buff);
} else {
// Give whole buffer back to aio subsystem
@@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
void AsynchIOHandler::eof(AsynchIO&) {
QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- inputHandler->closed();
+ if (codec) codec->closed();
aio->queueWriteClose();
}
@@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- if (isClient && !isInitiated) {
- //get & write protocol header from upper layers
- write(inputHandler->getInitiation());
- isInitiated = true;
+ if (isClient && codec == 0) {
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
- ScopedLock<Mutex> l(frameQueueLock);
-
- if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so tell the input handler to queue any available output:
- inputHandler->doOutput();
- //if still no frames, theres nothing to do:
- if (frameQueue.empty()) return;
- }
-
- do {
+ if (codec == 0) return;
+ while (codec->canEncode()) {
// Try and get a queued buffer if not then construct new one
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
-
- framing::AMQFrame frame = frameQueue.front();
- int frameSize = frame.size();
- int framesEncoded=0;
- while (frameSize <= int(out.available())) {
- frameQueue.pop();
-
- // Encode output frame
- frame.encode(out);
- ++framesEncoded;
- buffUsed += frameSize;
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
-
- if (frameQueue.empty()) {
- //if we have run out of frames, allow upper layers to
- //generate more
- if (!frameQueueClosed) {
- inputHandler->doOutput();
- }
- if (frameQueue.empty()) {
- //if there are still no frames, we have no more to
- //do
- break;
- }
- }
- frame = frameQueue.front();
- frameSize = frame.size();
- }
- QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames ");
-
- // If frame was egregiously large complain
- if (frameSize > buff->byteCount)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
-
- buff->dataCount = buffUsed;
+ if (!buff) buff = new Buff;
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
aio->queueWrite(buff);
- } while (!frameQueue.empty());
-
- if (frameQueueClosed) {
- aio->queueWriteClose();
}
+ if (codec->isClosed())
+ aio->queueWriteClose();
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
new file mode 100644
index 0000000000..205596c709
--- /dev/null
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -0,0 +1,80 @@
+#ifndef QPID_SYS_CONNECTION_CODEC_H
+#define QPID_SYS_CONNECTION_CODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/ProtocolVersion.h"
+#include "OutputControl.h"
+#include <memory>
+#include <map>
+
+namespace qpid {
+
+namespace broker { class Broker; }
+
+namespace sys {
+
+/**
+ * Interface of coder/decoder for a connection of a specific protocol
+ * version.
+ */
+class ConnectionCodec {
+ public:
+ virtual ~ConnectionCodec() {}
+
+ /** Decode from buffer, return number of bytes decoded.
+ * @return may be less than size if there was incomplete
+ * data at the end of the buffer.
+ */
+ virtual size_t decode(const char* buffer, size_t size) = 0;
+
+
+ /** Encode into buffer, return number of bytes encoded */
+ virtual size_t encode(const char* buffer, size_t size) = 0;
+
+ /** Return true if we have data to encode */
+ virtual bool canEncode() = 0;
+
+ /** Network connection was closed from other end. */
+ virtual void closed() = 0;
+
+ virtual bool isClosed() const = 0;
+
+ virtual framing::ProtocolVersion getVersion() const = 0;
+
+ struct Factory {
+ virtual ~Factory() {}
+
+ /** Return 0 if version unknown */
+ virtual ConnectionCodec* create(
+ framing::ProtocolVersion, OutputControl&, const std::string& id
+ ) = 0;
+
+ /** Return "preferred" codec for outbound connections. */
+ virtual ConnectionCodec* create(
+ OutputControl&, const std::string& id
+ ) = 0;
+ };
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CONNECTION_CODEC_H*/
diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h
index 1936b5ec50..a2c18d6d9a 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -22,8 +22,6 @@
#define _ConnectionInputHandler_
#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
#include "OutputTask.h"
#include "TimeoutHandler.h"
@@ -31,12 +29,10 @@ namespace qpid {
namespace sys {
class ConnectionInputHandler :
- public qpid::framing::InitiationHandler,
public qpid::framing::InputHandler,
public TimeoutHandler, public OutputTask
{
public:
- virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 13407d9b9d..5a60ae4998 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -22,7 +22,6 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
#include "OutputControl.h"
namespace qpid {
@@ -31,7 +30,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
{
public:
virtual void close() = 0;
diff --git a/cpp/src/tests/MockConnectionInputHandler.h b/cpp/src/tests/MockConnectionInputHandler.h
index d104e7d934..89b6155355 100644
--- a/cpp/src/tests/MockConnectionInputHandler.h
+++ b/cpp/src/tests/MockConnectionInputHandler.h
@@ -22,7 +22,6 @@
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/framing/ProtocolInitiation.h"
struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
@@ -30,23 +29,12 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
~MockConnectionInputHandler() {}
- void initiated(const qpid::framing::ProtocolInitiation& pi) {
- qpid::sys::Monitor::ScopedLock l(monitor);
- init = pi;
- setState(GOT_INIT);
- }
-
void received(qpid::framing::AMQFrame* framep) {
qpid::sys::Monitor::ScopedLock l(monitor);
frame = *framep;
setState(GOT_FRAME);
}
- qpid::framing::ProtocolInitiation waitForProtocolInit() {
- waitFor(GOT_INIT);
- return init;
- }
-
qpid::framing::AMQFrame waitForFrame() {
waitFor(GOT_FRAME);
return frame;
@@ -65,7 +53,7 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
void idleIn() {}
private:
- typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State;
+ typedef enum { START, GOT_FRAME, CLOSED } State;
void setState(State s) {
state = s;
@@ -81,7 +69,6 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
qpid::sys::Monitor monitor;
State state;
- qpid::framing::ProtocolInitiation init;
qpid::framing::AMQFrame frame;
};