/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "EventChannelThreads.h" #include #include using namespace std; #include namespace qpid { namespace sys { EventChannelThreads::shared_ptr EventChannelThreads::create( EventChannel::shared_ptr ec) { return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); } EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : channel(ec), nWaiting(0), state(RUNNING) { // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. addThread(); } EventChannelThreads::~EventChannelThreads() { shutdown(); join(); } void EventChannelThreads::shutdown() { ScopedLock lock(*this); if (state != RUNNING) // Already shutting down. return; for (size_t i = 0; i < workers.size(); ++i) { channel->postEvent(terminate); } state = TERMINATE_SENT; notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { ScopedLock lock(*this); while (state == RUNNING) // Wait for shutdown to start. wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) wait(); return; } // I'm the joining thread assert(state == TERMINATE_SENT); state = JOINING; } // Drop the lock. for (size_t i = 0; i < workers.size(); ++i) { assert(state == JOINING); // Only this thread can change JOINING. workers[i].join(); } state = SHUTDOWN; notifyAll(); // Notify other join() threaeds. } void EventChannelThreads::addThread() { ScopedLock l(*this); workers.push_back(Thread(*this)); } void EventChannelThreads::run() { // Start life waiting. Decrement on exit. AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { Event* e = channel->getEvent(); assert(e != 0); if (e == &terminate) { return; } AtomicCount::ScopedDecrement dec(nWaiting); // I'm no longer waiting, make sure someone is. if (dec == 0) addThread(); e->dispatch(); } } catch (const std::exception& e) { // TODO aconway 2006-11-15: need better logging across the board. std::cerr << "EventChannelThreads::run() caught: " << e.what() << std::endl; } catch (...) { std::cerr << "EventChannelThreads::run() caught unknown exception." << std::endl; } } }}