summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp235
1 files changed, 235 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
new file mode 100644
index 0000000000..2037ba38ab
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+ Duration timeout;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout_, AsynchIOHandler& h) :
+ TimerTask(timeout_, "ProtocolTimeout"),
+ handler(h),
+ id(i),
+ timeout(timeout_)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received after " << timeout
+ << ", closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
+ identifier(id),
+ aio(0),
+ factory(f),
+ codec(0),
+ readError(false),
+ isClient(isClient0),
+ nodict(nodict0),
+ headerSent(false)
+{}
+
+AsynchIOHandler::~AsynchIOHandler() {
+ if (codec)
+ codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
+ delete codec;
+}
+
+namespace {
+ SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
+ {
+ SecuritySettings settings = aio->getSecuritySettings();
+ settings.nodict = nodict;
+ return settings;
+ }
+}
+
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
+ aio = a;
+
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
+ // Give connection some buffers to use
+ aio->createBuffers();
+
+ if (isClient) {
+ codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
+ }
+}
+
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
+{
+ QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.encodedSize();
+ aio->queueWrite(buff);
+}
+
+void AsynchIOHandler::abort() {
+ // Don't disconnect if we're already disconnecting
+ if (!readError) {
+ aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
+ }
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::connectionEstablished() {
+ if (timeoutTimerTask) {
+ timeoutTimerTask->cancel();
+ timeoutTimerTask = 0;
+ }
+}
+
+void AsynchIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
+ if (readError) {
+ return;
+ }
+
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+
+ QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+ try {
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict));
+ if (!codec) {
+ //TODO: may still want to revise this...
+ //send valid version header & close connection.
+ write(framing::ProtocolInitiation(factory->supportedVersion()));
+ readError = true;
+ aio->queueWriteClose();
+ } else {
+ //read any further data that may already have been sent
+ decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded != size_t(buff->dataCount)) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio->unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buff);
+ }
+}
+
+void AsynchIOHandler::eof(AsynchIO& a) {
+ disconnect(a);
+ readError = true;
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+ // If we closed with data still to send log a warning
+ if (!aio->writeQueueEmpty()) {
+ QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
+ }
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO&) {
+ QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
+ if (codec) codec->closed();
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && !headerSent) {
+ write(framing::ProtocolInitiation(codec->getVersion()));
+ headerSent = true;
+ return;
+ }
+ if (codec == 0) return;
+ if (!codec->canEncode()) {
+ return;
+ }
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (buff) {
+ try {
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
+ aio->queueWrite(buff);
+ if (!codec->isClosed()) {
+ return;
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
+ }
+ }
+ readError = true;
+ aio->queueWriteClose();
+}
+
+}} // namespace qpid::sys