summaryrefslogtreecommitdiff
path: root/cpp/lib/common
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-20 15:11:37 +0000
committerAlan Conway <aconway@apache.org>2006-12-20 15:11:37 +0000
commitb6a25edfe049db4d4a7c109f2464fb4e4f16e600 (patch)
tree7793c3a8149eb0e8de308fe4a87df050bf46bc66 /cpp/lib/common
parentfc6d79eb365027d1fdda43ae0081f72dd45b7896 (diff)
downloadqpid-python-b6a25edfe049db4d4a7c109f2464fb4e4f16e600.tar.gz
Adding files for EventChannel implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common')
-rw-r--r--cpp/lib/common/sys/posix/EventChannelAcceptor.cpp149
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.cpp229
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.h102
3 files changed, 480 insertions, 0 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
new file mode 100644
index 0000000000..7cd6f60902
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+
+#include <boost/assert.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
+#include <sys/Acceptor.h>
+#include <sys/Socket.h>
+#include <framing/Buffer.h>
+#include <framing/AMQFrame.h>
+#include <Exception.h>
+
+#include "EventChannelConnection.h"
+
+namespace qpid {
+namespace sys {
+
+using namespace qpid::framing;
+using namespace std;
+
+class EventChannelAcceptor : public Acceptor {
+ public:
+
+
+ EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+ );
+
+ int getPort() const;
+
+ void run(SessionHandlerFactory& factory);
+
+ void shutdown();
+
+ private:
+
+ void accept();
+
+ Mutex lock;
+ Socket listener;
+ const int port;
+ const bool isTrace;
+ bool isRunning;
+ boost::ptr_vector<EventChannelConnection> connections;
+ AcceptEvent acceptEvent;
+ SessionHandlerFactory* factory;
+ bool isShutdown;
+ EventChannelThreads::shared_ptr threads;
+};
+
+Acceptor::shared_ptr Acceptor::create(
+ int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(
+ new EventChannelAcceptor(port, backlog, threads, trace));
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+EventChannelAcceptor::EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+) : listener(Socket::createTcp()),
+ port(listener.listen(int(port_), backlog)),
+ isTrace(trace_),
+ isRunning(false),
+ acceptEvent(listener.fd(),
+ boost::bind(&EventChannelAcceptor::accept, this)),
+ factory(0),
+ isShutdown(false),
+ threads(EventChannelThreads::create(EventChannel::create(), nThreads))
+{ }
+
+int EventChannelAcceptor::getPort() const {
+ return port; // Immutable no need for lock.
+}
+
+void EventChannelAcceptor::run(SessionHandlerFactory& f) {
+ {
+ Mutex::ScopedLock l(lock);
+ if (!isRunning && !isShutdown) {
+ isRunning = true;
+ factory = &f;
+ threads->post(acceptEvent);
+ }
+ }
+ threads->join(); // Wait for shutdown.
+}
+
+void EventChannelAcceptor::shutdown() {
+ bool doShutdown = false;
+ {
+ Mutex::ScopedLock l(lock);
+ doShutdown = !isShutdown; // I'm the shutdown thread.
+ isShutdown = true;
+ }
+ if (doShutdown) {
+ ::close(acceptEvent.getDescriptor());
+ threads->shutdown();
+ for_each(connections.begin(), connections.end(),
+ boost::bind(&EventChannelConnection::close, _1));
+ }
+ threads->join();
+}
+
+void EventChannelAcceptor::accept()
+{
+ // No lock, we only post one accept event at a time.
+ if (isShutdown)
+ return;
+ if (acceptEvent.getException()) {
+ Exception::log(*acceptEvent.getException(),
+ "EventChannelAcceptor::accept");
+ shutdown();
+ return;
+ }
+ // TODO aconway 2006-11-29: Need to reap closed connections also.
+ int fd = acceptEvent.getAcceptedDesscriptor();
+ connections.push_back(
+ new EventChannelConnection(threads, *factory, fd, fd, isTrace));
+ threads->post(acceptEvent); // Keep accepting.
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
new file mode 100644
index 0000000000..196dde5af8
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+
+#include "EventChannelConnection.h"
+#include "sys/SessionHandlerFactory.h"
+#include "QpidError.h"
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace sys {
+
+const size_t EventChannelConnection::bufferSize = 65536;
+
+EventChannelConnection::EventChannelConnection(
+ EventChannelThreads::shared_ptr threads_,
+ SessionHandlerFactory& factory_,
+ int rfd,
+ int wfd,
+ bool isTrace_
+) :
+ readFd(rfd),
+ writeFd(wfd ? wfd : rfd),
+ readCallback(boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endInitRead)),
+
+ isWriting(false),
+ isClosed(false),
+ threads(threads_),
+ handler(factory_.create(this)),
+ in(bufferSize),
+ out(bufferSize),
+ isTrace(isTrace_)
+{
+ BOOST_ASSERT(readFd > 0);
+ BOOST_ASSERT(writeFd > 0);
+ closeOnException(&EventChannelConnection::startRead);
+}
+
+
+void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+ {
+ Monitor::ScopedLock lock(monitor);
+ assert(frame.get());
+ writeFrames.push_back(frame.release());
+ }
+ closeOnException(&EventChannelConnection::startWrite);
+}
+
+void EventChannelConnection::close() {
+ {
+ Monitor::ScopedLock lock(monitor);
+ if (isClosed)
+ return;
+ isClosed = true;
+ }
+ ::close(readFd);
+ ::close(writeFd);
+ {
+ Monitor::ScopedLock lock(monitor);
+ while (busyThreads > 0)
+ monitor.wait();
+ }
+ handler->closed();
+}
+
+void EventChannelConnection::closeNoThrow() {
+ Exception::tryCatchLog<void>(
+ boost::bind(&EventChannelConnection::close, this),
+ false,
+ "Exception closing channel"
+ );
+}
+
+/**
+ * Call f in a try/catch block and close the connection if
+ * an exception is thrown.
+ */
+void EventChannelConnection::closeOnException(MemberFnPtr f)
+{
+ try {
+ Exception::tryCatchLog<void>(
+ boost::bind(f, this),
+ "Closing connection due to exception"
+ );
+ return;
+ } catch (...) {
+ // Exception was already logged by tryCatchLog
+ closeNoThrow();
+ }
+}
+
+// Post the write event.
+// Always called inside closeOnException.
+// Called by endWrite and send, but only one thread writes at a time.
+//
+void EventChannelConnection::startWrite() {
+ FrameQueue::auto_type frame;
+ {
+ Monitor::ScopedLock lock(monitor);
+ // Stop if closed or a write event is already in progress.
+ if (isClosed || isWriting)
+ return;
+ if (writeFrames.empty()) {
+ isWriting = false;
+ return;
+ }
+ isWriting = true;
+ frame = writeFrames.pop_front();
+ }
+ // No need to lock here - only one thread can be writing at a time.
+ out.clear();
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << *frame << endl;
+ frame->encode(out);
+ out.flip();
+ writeEvent = WriteEvent(
+ writeFd, out.start(), out.available(),
+ boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endWrite));
+ threads->post(writeEvent);
+}
+
+// ScopedBusy ctor increments busyThreads.
+// dtor decrements and calls monitor.notifyAll if it reaches 0.
+//
+struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
+{
+ ScopedBusy(EventChannelConnection& ecc)
+ : AtomicCount::ScopedIncrement(
+ ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
+ {}
+};
+
+// Write event completed.
+// Always called by a channel thread inside closeOnException.
+//
+void EventChannelConnection::endWrite() {
+ ScopedBusy(*this);
+ {
+ Monitor::ScopedLock lock(monitor);
+ isWriting = false;
+ if (isClosed)
+ return;
+ writeEvent.throwIfException();
+ }
+ // Check if there's more in to write in the write queue.
+ startWrite();
+}
+
+
+// Post the read event.
+// Always called inside closeOnException.
+// Called from ctor and end[Init]Read, so only one call at a time
+// is possible since we only post one read event at a time.
+//
+void EventChannelConnection::startRead() {
+ // Non blocking read, as much as we can swallow.
+ readEvent = ReadEvent(
+ readFd, in.start(), in.available(), readCallback,true);
+ threads->post(readEvent);
+}
+
+// Completion of initial read, expect protocolInit.
+// Always called inside closeOnException in channel thread.
+// Only called by one thread at a time.
+void EventChannelConnection::endInitRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ readCallback = boost::bind(
+ &EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endRead);
+ }
+ in.compact();
+ // Continue reading.
+ startRead();
+ }
+}
+
+// Normal reads, expect a frame.
+// Always called inside closeOnException in channel thread.
+void EventChannelConnection::endRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ AMQFrame frame;
+ while (frame.decode(in)) {
+ // TODO aconway 2006-11-30: received should take Frame&
+ if (isTrace)
+ cout << "Received on socket " << readFd
+ << ": " << frame << endl;
+ handler->received(&frame);
+ }
+ in.compact();
+ startRead();
+ }
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h
new file mode 100644
index 0000000000..bace045993
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.h
@@ -0,0 +1,102 @@
+#ifndef _posix_EventChannelConnection_h
+#define _posix_EventChannelConnection_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/ptr_container/ptr_deque.hpp>
+
+#include "EventChannelThreads.h"
+#include "sys/Monitor.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/AtomicCount.h"
+#include "framing/AMQFrame.h"
+
+namespace qpid {
+namespace sys {
+
+class SessionHandlerFactory;
+
+/**
+ * Implements SessionContext and delegates to a SessionHandler
+ * for a connection via the EventChannel.
+ *@param readDescriptor file descriptor for reading.
+ *@param writeDescriptor file descriptor for writing,
+ * by default same as readDescriptor
+ */
+class EventChannelConnection : public SessionContext {
+ public:
+ EventChannelConnection(
+ EventChannelThreads::shared_ptr threads,
+ SessionHandlerFactory& factory,
+ int readDescriptor,
+ int writeDescriptor = 0,
+ bool isTrace = false
+ );
+
+ // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
+ virtual void send(qpid::framing::AMQFrame* frame) {
+ send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
+ }
+
+ virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
+
+ virtual void close();
+
+ private:
+ typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+ typedef void (EventChannelConnection::*MemberFnPtr)();
+ struct ScopedBusy;
+
+ void startWrite();
+ void endWrite();
+ void startRead();
+ void endInitRead();
+ void endRead();
+ void closeNoThrow();
+ void closeOnException(MemberFnPtr);
+ bool shouldContinue(bool& flag);
+
+ static const size_t bufferSize;
+
+ Monitor monitor;
+
+ int readFd, writeFd;
+ ReadEvent readEvent;
+ WriteEvent writeEvent;
+ Event::Callback readCallback;
+ bool isWriting;
+ bool isClosed;
+ AtomicCount busyThreads;
+
+ EventChannelThreads::shared_ptr threads;
+ std::auto_ptr<SessionHandler> handler;
+ qpid::framing::Buffer in, out;
+ FrameQueue writeFrames;
+ bool isTrace;
+
+ friend struct ScopedBusy;
+};
+
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!_posix_EventChannelConnection_h*/