summaryrefslogtreecommitdiff
path: root/qpid/cpp-0-9/lib/common/sys/posix
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp-0-9/lib/common/sys/posix')
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannel.cpp325
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannel.h176
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannelAcceptor.cpp149
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.cpp229
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.h102
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.cpp119
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.h92
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/PosixAcceptor.cpp48
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/Socket.cpp118
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/Thread.cpp28
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/check.cpp39
-rw-r--r--qpid/cpp-0-9/lib/common/sys/posix/check.h62
12 files changed, 0 insertions, 1487 deletions
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.cpp b/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.cpp
deleted file mode 100644
index 16c7ec9c3f..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.cpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <mqueue.h>
-#include <string.h>
-#include <iostream>
-
-#include <sys/errno.h>
-#include <sys/socket.h>
-#include <sys/epoll.h>
-
-#include <typeinfo>
-#include <iostream>
-#include <queue>
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
-
-#include <QpidError.h>
-#include <sys/Monitor.h>
-
-#include "check.h"
-#include "EventChannel.h"
-
-using namespace std;
-
-
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
-namespace qpid {
-namespace sys {
-
-
-/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
-{
- public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
-
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
-
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
-
- private:
- int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
-};
-
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
-
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
- //
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
-
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
-}
-
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
-}
-
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
-}
-
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
- return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
-}
-
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-Event* EventHandler::complete(EventHandler& eh)
-{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
-}
-
-// ================================================================
-// EventChannel
-
-EventChannel::shared_ptr EventChannel::create() {
- return shared_ptr(new EventChannel());
-}
-
-EventChannel::EventChannel() : handler(new EventHandler()) {}
-
-EventChannel::~EventChannel() {}
-
-void EventChannel::postEvent(Event& e)
-{
- e.prepare(*handler);
-}
-
-Event* EventChannel::getEvent()
-{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
-
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- // TODO aconway 2006-11-28: Proper handling/logging of errors.
- cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
- << PosixError::getMessage(errno) << endl;
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
-}
-
-Event::~Event() {}
-
-void Event::prepare(EventHandler& handler)
-{
- handler.mqPut(this);
-}
-
-bool Event::hasError() const {
- return error;
-}
-
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
-{
- return this;
-}
-
-void Event::dispatch()
-{
- try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
- }
-}
-
-void Event::setError(const ExceptionHolder& e) {
- error = e;
-}
-
-void ReadEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
-}
-
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
-}
-
-Event* ReadEvent::complete(EventHandler& handler)
-{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
- return 0;
- }
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
- }
-}
-
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
-}
-
-Event* WriteEvent::complete(EventHandler& handler)
-{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
- }
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
- return this;
-}
-
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
-}
-
-Event* AcceptEvent::complete(EventHandler& handler)
-{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
- return this;
-}
-
-}}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.h b/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.h
deleted file mode 100644
index 49c7fce740..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannel.h
+++ /dev/null
@@ -1,176 +0,0 @@
-#ifndef _sys_EventChannel_h
-#define _sys_EventChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <SharedObject.h>
-#include <ExceptionHolder.h>
-#include <boost/function.hpp>
-#include <memory>
-
-namespace qpid {
-namespace sys {
-
-class Event;
-class EventHandler;
-class EventChannel;
-
-/**
- * Base class for all Events.
- */
-class Event
-{
- public:
- /** Type for callback when event is dispatched */
- typedef boost::function0<void> Callback;
-
- /**
- * Create an event with optional callback.
- * Instances of Event are sent directly through the channel.
- * Derived classes define additional waiting behaviour.
- *@param cb A callback functor that is invoked when dispatch() is called.
- */
- Event(Callback cb = 0) : callback(cb) {}
-
- virtual ~Event();
-
- /** Call the callback provided to the constructor, if any. */
- void dispatch();
-
- /** True if there was an error processing this event */
- bool hasError() const;
-
- /** If hasError() throw the corresponding exception. */
- void throwIfError() throw(Exception);
-
- protected:
- virtual void prepare(EventHandler&);
- virtual Event* complete(EventHandler&);
- void setError(const ExceptionHolder& e);
-
- Callback callback;
- ExceptionHolder error;
-
- friend class EventChannel;
- friend class EventHandler;
-};
-
-template <class BufT>
-class IOEvent : public Event {
- public:
- void getDescriptor() const { return descriptor; }
- size_t getSize() const { return size; }
- BufT getBuffer() const { return buffer; }
-
- protected:
- IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
- Event(cb), descriptor(fd), buffer(buf), size(sz) {}
-
- int descriptor;
- BufT buffer;
- size_t size;
-};
-
-/** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
-{
- public:
- explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
- IOEvent<void*>(fd, cb, sz, buf), received(0) {}
-
- private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
- ssize_t doRead();
-
- size_t received;
-};
-
-/** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
-{
- public:
- explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
- Callback cb=0) :
- IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
-
- protected:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
-
- private:
- ssize_t doWrite();
- size_t written;
-};
-
-/** Asynchronous socket accept event */
-class AcceptEvent : public Event
-{
- public:
- /** Accept a connection on fd. */
- explicit AcceptEvent(int fd=-1, Callback cb=0) :
- Event(cb), descriptor(fd), accepted(0) {}
-
- /** Get descriptor for server socket */
- int getAcceptedDesscriptor() const { return accepted; }
-
- private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
-
- int descriptor;
- int accepted;
-};
-
-
-class QueueSet;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static shared_ptr create();
-
- ~EventChannel();
-
- /** Post an event to the channel. */
- void postEvent(Event& event);
-
- /** Post an event to the channel. Must not be 0. */
- void postEvent(Event* event) { postEvent(*event); }
-
- /**
- * Wait for the next complete event.
- *@return Pointer to event. Will never return 0.
- */
- Event* getEvent();
-
- private:
- EventChannel();
- boost::shared_ptr<EventHandler> handler;
-};
-
-
-}}
-
-
-
-#endif /*!_sys_EventChannel_h*/
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelAcceptor.cpp b/qpid/cpp-0-9/lib/common/sys/posix/EventChannelAcceptor.cpp
deleted file mode 100644
index 548fbd1881..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelAcceptor.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-
-#include <boost/assert.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/ptr_container/ptr_deque.hpp>
-#include <boost/bind.hpp>
-#include <boost/scoped_ptr.hpp>
-
-#include <sys/ConnectionOutputHandler.h>
-#include <sys/ConnectionInputHandler.h>
-#include <sys/ConnectionInputHandlerFactory.h>
-#include <sys/Acceptor.h>
-#include <sys/Socket.h>
-#include <framing/Buffer.h>
-#include <framing/AMQFrame.h>
-#include <Exception.h>
-
-#include "EventChannelConnection.h"
-
-namespace qpid {
-namespace sys {
-
-using namespace qpid::framing;
-using namespace std;
-
-class EventChannelAcceptor : public Acceptor {
- public:
-
-
- EventChannelAcceptor(
- int16_t port_, int backlog, int nThreads, bool trace_
- );
-
- int getPort() const;
-
- void run(ConnectionInputHandlerFactory& factory);
-
- void shutdown();
-
- private:
-
- void accept();
-
- Mutex lock;
- Socket listener;
- const int port;
- const bool isTrace;
- bool isRunning;
- boost::ptr_vector<EventChannelConnection> connections;
- AcceptEvent acceptEvent;
- ConnectionInputHandlerFactory* factory;
- bool isShutdown;
- EventChannelThreads::shared_ptr threads;
-};
-
-Acceptor::shared_ptr Acceptor::create(
- int16_t port, int backlog, int threads, bool trace)
-{
- return Acceptor::shared_ptr(
- new EventChannelAcceptor(port, backlog, threads, trace));
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-EventChannelAcceptor::EventChannelAcceptor(
- int16_t port_, int backlog, int nThreads, bool trace_
-) : listener(Socket::createTcp()),
- port(listener.listen(int(port_), backlog)),
- isTrace(trace_),
- isRunning(false),
- acceptEvent(listener.fd(),
- boost::bind(&EventChannelAcceptor::accept, this)),
- factory(0),
- isShutdown(false),
- threads(EventChannelThreads::create(EventChannel::create(), nThreads))
-{ }
-
-int EventChannelAcceptor::getPort() const {
- return port; // Immutable no need for lock.
-}
-
-void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) {
- {
- Mutex::ScopedLock l(lock);
- if (!isRunning && !isShutdown) {
- isRunning = true;
- factory = &f;
- threads->post(acceptEvent);
- }
- }
- threads->join(); // Wait for shutdown.
-}
-
-void EventChannelAcceptor::shutdown() {
- bool doShutdown = false;
- {
- Mutex::ScopedLock l(lock);
- doShutdown = !isShutdown; // I'm the shutdown thread.
- isShutdown = true;
- }
- if (doShutdown) {
- ::close(acceptEvent.getDescriptor());
- threads->shutdown();
- for_each(connections.begin(), connections.end(),
- boost::bind(&EventChannelConnection::close, _1));
- }
- threads->join();
-}
-
-void EventChannelAcceptor::accept()
-{
- // No lock, we only post one accept event at a time.
- if (isShutdown)
- return;
- if (acceptEvent.getException()) {
- Exception::log(*acceptEvent.getException(),
- "EventChannelAcceptor::accept");
- shutdown();
- return;
- }
- // TODO aconway 2006-11-29: Need to reap closed connections also.
- int fd = acceptEvent.getAcceptedDesscriptor();
- connections.push_back(
- new EventChannelConnection(threads, *factory, fd, fd, isTrace));
- threads->post(acceptEvent); // Keep accepting.
-}
-
-}} // namespace qpid::sys
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.cpp b/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.cpp
deleted file mode 100644
index 4449dc3035..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.cpp
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <iostream>
-
-#include <boost/bind.hpp>
-#include <boost/assert.hpp>
-
-#include "EventChannelConnection.h"
-#include "sys/ConnectionInputHandlerFactory.h"
-#include "QpidError.h"
-
-using namespace std;
-using namespace qpid;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace sys {
-
-const size_t EventChannelConnection::bufferSize = 65536;
-
-EventChannelConnection::EventChannelConnection(
- EventChannelThreads::shared_ptr threads_,
- ConnectionInputHandlerFactory& factory_,
- int rfd,
- int wfd,
- bool isTrace_
-) :
- readFd(rfd),
- writeFd(wfd ? wfd : rfd),
- readCallback(boost::bind(&EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endInitRead)),
-
- isWriting(false),
- isClosed(false),
- threads(threads_),
- handler(factory_.create(this)),
- in(bufferSize),
- out(bufferSize),
- isTrace(isTrace_)
-{
- BOOST_ASSERT(readFd > 0);
- BOOST_ASSERT(writeFd > 0);
- closeOnException(&EventChannelConnection::startRead);
-}
-
-
-void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
- {
- Monitor::ScopedLock lock(monitor);
- assert(frame.get());
- writeFrames.push_back(frame.release());
- }
- closeOnException(&EventChannelConnection::startWrite);
-}
-
-void EventChannelConnection::close() {
- {
- Monitor::ScopedLock lock(monitor);
- if (isClosed)
- return;
- isClosed = true;
- }
- ::close(readFd);
- ::close(writeFd);
- {
- Monitor::ScopedLock lock(monitor);
- while (busyThreads > 0)
- monitor.wait();
- }
- handler->closed();
-}
-
-void EventChannelConnection::closeNoThrow() {
- Exception::tryCatchLog<void>(
- boost::bind(&EventChannelConnection::close, this),
- false,
- "Exception closing channel"
- );
-}
-
-/**
- * Call f in a try/catch block and close the connection if
- * an exception is thrown.
- */
-void EventChannelConnection::closeOnException(MemberFnPtr f)
-{
- try {
- Exception::tryCatchLog<void>(
- boost::bind(f, this),
- "Closing connection due to exception"
- );
- return;
- } catch (...) {
- // Exception was already logged by tryCatchLog
- closeNoThrow();
- }
-}
-
-// Post the write event.
-// Always called inside closeOnException.
-// Called by endWrite and send, but only one thread writes at a time.
-//
-void EventChannelConnection::startWrite() {
- FrameQueue::auto_type frame;
- {
- Monitor::ScopedLock lock(monitor);
- // Stop if closed or a write event is already in progress.
- if (isClosed || isWriting)
- return;
- if (writeFrames.empty()) {
- isWriting = false;
- return;
- }
- isWriting = true;
- frame = writeFrames.pop_front();
- }
- // No need to lock here - only one thread can be writing at a time.
- out.clear();
- if (isTrace)
- cout << "Send on socket " << writeFd << ": " << *frame << endl;
- frame->encode(out);
- out.flip();
- writeEvent = WriteEvent(
- writeFd, out.start(), out.available(),
- boost::bind(&EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endWrite));
- threads->post(writeEvent);
-}
-
-// ScopedBusy ctor increments busyThreads.
-// dtor decrements and calls monitor.notifyAll if it reaches 0.
-//
-struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
-{
- ScopedBusy(EventChannelConnection& ecc)
- : AtomicCount::ScopedIncrement(
- ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
- {}
-};
-
-// Write event completed.
-// Always called by a channel thread inside closeOnException.
-//
-void EventChannelConnection::endWrite() {
- ScopedBusy(*this);
- {
- Monitor::ScopedLock lock(monitor);
- isWriting = false;
- if (isClosed)
- return;
- writeEvent.throwIfException();
- }
- // Check if there's more in to write in the write queue.
- startWrite();
-}
-
-
-// Post the read event.
-// Always called inside closeOnException.
-// Called from ctor and end[Init]Read, so only one call at a time
-// is possible since we only post one read event at a time.
-//
-void EventChannelConnection::startRead() {
- // Non blocking read, as much as we can swallow.
- readEvent = ReadEvent(
- readFd, in.start(), in.available(), readCallback,true);
- threads->post(readEvent);
-}
-
-// Completion of initial read, expect protocolInit.
-// Always called inside closeOnException in channel thread.
-// Only called by one thread at a time.
-void EventChannelConnection::endInitRead() {
- ScopedBusy(*this);
- if (!isClosed) {
- readEvent.throwIfException();
- in.move(readEvent.getBytesRead());
- in.flip();
- ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- handler->initiated(&protocolInit);
- readCallback = boost::bind(
- &EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endRead);
- }
- in.compact();
- // Continue reading.
- startRead();
- }
-}
-
-// Normal reads, expect a frame.
-// Always called inside closeOnException in channel thread.
-void EventChannelConnection::endRead() {
- ScopedBusy(*this);
- if (!isClosed) {
- readEvent.throwIfException();
- in.move(readEvent.getBytesRead());
- in.flip();
- AMQFrame frame;
- while (frame.decode(in)) {
- // TODO aconway 2006-11-30: received should take Frame&
- if (isTrace)
- cout << "Received on socket " << readFd
- << ": " << frame << endl;
- handler->received(&frame);
- }
- in.compact();
- startRead();
- }
-}
-
-}} // namespace qpid::sys
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.h b/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.h
deleted file mode 100644
index da7b6dca27..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelConnection.h
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef _posix_EventChannelConnection_h
-#define _posix_EventChannelConnection_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/ptr_container/ptr_deque.hpp>
-
-#include "EventChannelThreads.h"
-#include "sys/Monitor.h"
-#include "sys/ConnectionOutputHandler.h"
-#include "sys/ConnectionInputHandler.h"
-#include "sys/AtomicCount.h"
-#include "framing/AMQFrame.h"
-
-namespace qpid {
-namespace sys {
-
-class ConnectionInputHandlerFactory;
-
-/**
- * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
- * for a connection via the EventChannel.
- *@param readDescriptor file descriptor for reading.
- *@param writeDescriptor file descriptor for writing,
- * by default same as readDescriptor
- */
-class EventChannelConnection : public ConnectionOutputHandler {
- public:
- EventChannelConnection(
- EventChannelThreads::shared_ptr threads,
- ConnectionInputHandlerFactory& factory,
- int readDescriptor,
- int writeDescriptor = 0,
- bool isTrace = false
- );
-
- // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
- virtual void send(qpid::framing::AMQFrame* frame) {
- send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
- }
-
- virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
-
- virtual void close();
-
- private:
- typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
- typedef void (EventChannelConnection::*MemberFnPtr)();
- struct ScopedBusy;
-
- void startWrite();
- void endWrite();
- void startRead();
- void endInitRead();
- void endRead();
- void closeNoThrow();
- void closeOnException(MemberFnPtr);
- bool shouldContinue(bool& flag);
-
- static const size_t bufferSize;
-
- Monitor monitor;
-
- int readFd, writeFd;
- ReadEvent readEvent;
- WriteEvent writeEvent;
- Event::Callback readCallback;
- bool isWriting;
- bool isClosed;
- AtomicCount busyThreads;
-
- EventChannelThreads::shared_ptr threads;
- std::auto_ptr<ConnectionInputHandler> handler;
- qpid::framing::Buffer in, out;
- FrameQueue writeFrames;
- bool isTrace;
-
- friend struct ScopedBusy;
-};
-
-
-}} // namespace qpid::sys
-
-
-
-#endif /*!_posix_EventChannelConnection_h*/
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.cpp b/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.cpp
deleted file mode 100644
index 95e699e0b0..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.cpp
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "EventChannelThreads.h"
-#include <sys/Runnable.h>
-#include <iostream>
-using namespace std;
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace sys {
-
-EventChannelThreads::shared_ptr EventChannelThreads::create(
- EventChannel::shared_ptr ec)
-{
- return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
-}
-
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
- channel(ec), nWaiting(0), state(RUNNING)
-{
- // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
- addThread();
-}
-
-EventChannelThreads::~EventChannelThreads() {
- shutdown();
- join();
-}
-
-void EventChannelThreads::shutdown()
-{
- ScopedLock lock(*this);
- if (state != RUNNING) // Already shutting down.
- return;
- for (size_t i = 0; i < workers.size(); ++i) {
- channel->postEvent(terminate);
- }
- state = TERMINATE_SENT;
- notify(); // Wake up one join() thread.
-}
-
-void EventChannelThreads::join()
-{
- {
- ScopedLock lock(*this);
- while (state == RUNNING) // Wait for shutdown to start.
- wait();
- if (state == SHUTDOWN) // Shutdown is complete
- return;
- if (state == JOINING) {
- // Someone else is doing the join.
- while (state != SHUTDOWN)
- wait();
- return;
- }
- // I'm the joining thread
- assert(state == TERMINATE_SENT);
- state = JOINING;
- } // Drop the lock.
-
- for (size_t i = 0; i < workers.size(); ++i) {
- assert(state == JOINING); // Only this thread can change JOINING.
- workers[i].join();
- }
- state = SHUTDOWN;
- notifyAll(); // Notify other join() threaeds.
-}
-
-void EventChannelThreads::addThread() {
- ScopedLock l(*this);
- workers.push_back(Thread(*this));
-}
-
-void EventChannelThreads::run()
-{
- // Start life waiting. Decrement on exit.
- AtomicCount::ScopedIncrement inc(nWaiting);
- try {
- while (true) {
- Event* e = channel->getEvent();
- assert(e != 0);
- if (e == &terminate) {
- return;
- }
- AtomicCount::ScopedDecrement dec(nWaiting);
- // I'm no longer waiting, make sure someone is.
- if (dec == 0)
- addThread();
- e->dispatch();
- }
- }
- catch (const std::exception& e) {
- // TODO aconway 2006-11-15: need better logging across the board.
- std::cerr << "EventChannelThreads::run() caught: " << e.what()
- << std::endl;
- }
- catch (...) {
- std::cerr << "EventChannelThreads::run() caught unknown exception."
- << std::endl;
- }
-}
-
-}}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.h b/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.h
deleted file mode 100644
index 98403c0869..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/EventChannelThreads.h
+++ /dev/null
@@ -1,92 +0,0 @@
-#ifndef _posix_EventChannelThreads_h
-#define _sys_EventChannelThreads_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <vector>
-
-#include <Exception.h>
-#include <sys/Time.h>
-#include <sys/Monitor.h>
-#include <sys/Thread.h>
-#include <sys/AtomicCount.h>
-#include "EventChannel.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- Dynamic thread pool serving an EventChannel.
-
- Threads run a loop { e = getEvent(); e->dispatch(); }
- The size of the thread pool is automatically adjusted to optimal size.
-*/
-class EventChannelThreads :
- public qpid::SharedObject<EventChannelThreads>,
- public sys::Monitor, private sys::Runnable
-{
- public:
- /** Create the thread pool and start initial threads. */
- static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel
- );
-
- ~EventChannelThreads();
-
- /** Post event to the underlying channel */
- void postEvent(Event& event) { channel->postEvent(event); }
-
- /** Post event to the underlying channel Must not be 0. */
- void postEvent(Event* event) { channel->postEvent(event); }
-
- /**
- * Terminate all threads.
- *
- * Returns immediately, use join() to wait till all threads are
- * shut down.
- */
- void shutdown();
-
- /** Wait for all threads to terminate. */
- void join();
-
- private:
- typedef std::vector<sys::Thread> Threads;
- typedef enum {
- RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
- } State;
-
- EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
- void addThread();
-
- void run();
- bool keepRunning();
- void adjustThreads();
-
- EventChannel::shared_ptr channel;
- Threads workers;
- sys::AtomicCount nWaiting;
- State state;
- Event terminate;
-};
-
-
-}}
-
-
-#endif /*!_sys_EventChannelThreads_h*/
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/PosixAcceptor.cpp b/qpid/cpp-0-9/lib/common/sys/posix/PosixAcceptor.cpp
deleted file mode 100644
index a80a6c61f7..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/PosixAcceptor.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <sys/Acceptor.h>
-#include <Exception.h>
-
-namespace qpid {
-namespace sys {
-
-namespace {
-void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
-}
-
-class PosixAcceptor : public Acceptor {
- public:
- virtual int16_t getPort() const { fail(); return 0; }
- virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); }
- virtual void shutdown() { fail(); }
-};
-
-// Define generic Acceptor::create() to return APRAcceptor.
- Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
-{
- return Acceptor::shared_ptr(new PosixAcceptor());
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-}}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/Socket.cpp b/qpid/cpp-0-9/lib/common/sys/posix/Socket.cpp
deleted file mode 100644
index 5bd13742f6..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/Socket.cpp
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <sys/socket.h>
-#include <sys/errno.h>
-#include <netinet/in.h>
-#include <netdb.h>
-
-#include <boost/format.hpp>
-
-#include <QpidError.h>
-#include <posix/check.h>
-#include <sys/Socket.h>
-
-using namespace qpid::sys;
-
-Socket Socket::createTcp()
-{
- int s = ::socket (PF_INET, SOCK_STREAM, 0);
- if (s < 0) throw QPID_POSIX_ERROR(errno);
- return s;
-}
-
-Socket::Socket(int descriptor) : socket(descriptor) {}
-
-void Socket::setTimeout(Time interval)
-{
- struct timeval tv;
- tv.tv_sec = interval/TIME_SEC;
- tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
-}
-
-void Socket::connect(const std::string& host, int port)
-{
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- struct hostent* hp = gethostbyname ( host.c_str() );
- if (hp == 0) throw QPID_POSIX_ERROR(errno);
- memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
- if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-void
-Socket::close()
-{
- if (socket == 0) return;
- if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
- socket = 0;
-}
-
-ssize_t
-Socket::send(const void* data, size_t size)
-{
- ssize_t sent = ::send(socket, data, size, 0);
- if (sent < 0) {
- if (errno == ECONNRESET) return SOCKET_EOF;
- if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- throw QPID_POSIX_ERROR(errno);
- }
- return sent;
-}
-
-ssize_t
-Socket::recv(void* data, size_t size)
-{
- ssize_t received = ::recv(socket, data, size, 0);
- if (received < 0) {
- if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- throw QPID_POSIX_ERROR(errno);
- }
- return received;
-}
-
-int Socket::listen(int port, int backlog)
-{
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
- throw QPID_POSIX_ERROR(errno);
- if (::listen(socket, backlog) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
-}
-
-
-int Socket::fd()
-{
- return socket;
-}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/Thread.cpp b/qpid/cpp-0-9/lib/common/sys/posix/Thread.cpp
deleted file mode 100644
index f524799556..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/Thread.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <sys/Thread.h>
-
-void* qpid::sys::Thread::runRunnable(void* p)
-{
- static_cast<Runnable*>(p)->run();
- return 0;
-}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/check.cpp b/qpid/cpp-0-9/lib/common/sys/posix/check.cpp
deleted file mode 100644
index 408679caa8..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/check.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <cerrno>
-#include "check.h"
-
-namespace qpid {
-namespace sys {
-
-std::string
-PosixError::getMessage(int errNo)
-{
- char buf[512];
- return std::string(strerror_r(errNo, buf, sizeof(buf)));
-}
-
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
- : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
-{ }
-
-}}
diff --git a/qpid/cpp-0-9/lib/common/sys/posix/check.h b/qpid/cpp-0-9/lib/common/sys/posix/check.h
deleted file mode 100644
index 57b5a5757c..0000000000
--- a/qpid/cpp-0-9/lib/common/sys/posix/check.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef _posix_check_h
-#define _posix_check_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <cerrno>
-#include <string>
-#include <QpidError.h>
-
-namespace qpid {
-namespace sys {
-
-/**
- * Exception with message from errno.
- */
-class PosixError : public qpid::QpidError
-{
- public:
- static std::string getMessage(int errNo);
-
- PosixError(int errNo, const qpid::SrcLine& location) throw();
-
- ~PosixError() throw() {}
-
- int getErrNo() { return errNo; }
-
- Exception* clone() const throw() { return new PosixError(*this); }
-
- void throwSelf() const { throw *this; }
-
- private:
- int errNo;
-};
-
-}}
-
-/** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
-
-/** Throw a posix error if errNo is non-zero */
-#define QPID_POSIX_THROW_IF(ERRNO) \
- if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
-#endif /*!_posix_check_h*/