diff options
Diffstat (limited to 'winsup')
-rw-r--r-- | winsup/cygserver/threaded_queue.cc | 492 | ||||
-rw-r--r-- | winsup/cygwin/ChangeLog | 45 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver.cc | 228 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver_process.cc | 616 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver_shm.cc | 14 | ||||
-rwxr-xr-x | winsup/cygwin/include/cygwin/cygserver_process.h | 128 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.cc | 492 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.h | 141 |
8 files changed, 1316 insertions, 840 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc index aab60263455..e3da7f74728 100644 --- a/winsup/cygserver/threaded_queue.cc +++ b/winsup/cygserver/threaded_queue.cc @@ -4,14 +4,15 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #include "woutsup.h" +#include <assert.h> #include <errno.h> #include <stdio.h> #include <unistd.h> @@ -19,226 +20,387 @@ #include <stdlib.h> #include "threaded_queue.h" +/*****************************************************************************/ + +/* queue_request */ + +queue_request::~queue_request () +{} + +/*****************************************************************************/ + /* threaded_queue */ -DWORD WINAPI -worker_function (LPVOID LpParam) +threaded_queue::threaded_queue (const size_t initial_workers) + : _workers_count (0), + _running (false), + _submitters_head (NULL), + _requests_count (0), + _requests_head (NULL), + _requests_sem (NULL) { - class threaded_queue *queue = (class threaded_queue *) LpParam; - class queue_request *request; - /* FIXME use a threadsafe pop instead for speed? */ - while (queue->active) + InitializeCriticalSection (&_queue_lock); + + // This semaphore's count is the number of requests on the queue. + // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS + // multiplied by max. threads per process (2028?), which is (a few) + // more requests than could ever be pending with the current design. + + _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES + 0, // Initial count + 129792, // Maximum count + NULL); // Anonymous + + if (!_requests_sem) { - EnterCriticalSection (&queue->queuelock); - while (!queue->request && queue->active) - { - LeaveCriticalSection (&queue->queuelock); - DWORD rc = WaitForSingleObject (queue->event, INFINITE); - if (rc == WAIT_FAILED) - { - system_printf ("Wait for event failed"); - queue->running--; - ExitThread (0); - } - EnterCriticalSection (&queue->queuelock); - } - if (!queue->active) - { - queue->running--; - LeaveCriticalSection (&queue->queuelock); - ExitThread (0); - } - /* not needed, but it is efficient */ - request = - (class queue_request *) InterlockedExchangePointer (&queue->request, - queue->request-> - next); - LeaveCriticalSection (&queue->queuelock); - request->process (); - delete request; + system_printf (("failed to create the request queue semaphore, " + "error = %lu"), + GetLastError ()); + abort (); } - queue->running--; - ExitThread (0); + + create_workers (initial_workers); } -void -threaded_queue::create_workers () +threaded_queue::~threaded_queue () { - InitializeCriticalSection (&queuelock); - if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + if (_running) + stop (); + + debug_printf ("deleting all pending queue requests"); + queue_request *reqptr = _requests_head; + while (reqptr) { - system_printf ("Failed to create event queue (%lu), terminating", - GetLastError ()); - exit (1); + queue_request *const ptr = reqptr; + reqptr = reqptr->_next; + delete ptr; } - active = true; - /* FIXME: Use a stack pair and create threads on the fly whenever - * we have to to service a request. - */ - for (unsigned int i = 0; i < initial_workers; i++) + DeleteCriticalSection (&_queue_lock); + if (_requests_sem) + (void) CloseHandle (_requests_sem); +} + +/* FIXME: return success or failure rather than quitting */ +void +threaded_queue::add_submission_loop (queue_submission_loop *const submitter) +{ + assert (this); + assert (submitter); + assert (submitter->_queue == this); + assert (!submitter->_next); + + submitter->_next = + TInterlockedExchangePointer (&_submitters_head, submitter); + + if (_running) + submitter->start (); +} + +bool +threaded_queue::start () +{ + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = true; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (!was_running) { - HANDLE hThread; - DWORD tid; - hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); - if (hThread == NULL) + debug_printf ("starting all queue submission loops"); + + while (loopptr) { - system_printf ("Failed to create thread (%lu), terminating", - GetLastError ()); - exit (1); + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->start (); } - CloseHandle (hThread); - running++; } + + return was_running; } -void -threaded_queue::cleanup () +bool +threaded_queue::stop () { - /* harvest the threads */ - active = false; - /* kill the request processing loops */ - queue_process_param *reqloop; - /* make sure we don't race with a incoming request creation */ - EnterCriticalSection (&queuelock); - reqloop = - (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); - while (reqloop) + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = false; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (was_running) { - queue_process_param *t = reqloop; - reqloop = reqloop->next; - delete t; + debug_printf ("stopping all queue submission loops"); + while (loopptr) + { + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->stop (); + } + + ReleaseSemaphore (_requests_sem, _workers_count, NULL); + while (_workers_count) + { + debug_printf (("waiting for worker threads to terminate: " + "%lu still running"), + _workers_count); + sleep (1); + } + debug_printf ("all worker threads have terminated"); } - LeaveCriticalSection (&queuelock); - if (!running) - return; - debug_printf ("Waiting for current queue threads to terminate"); - for (int n = running; n; n--) - PulseEvent (event); - while (running) - sleep (1); - DeleteCriticalSection (&queuelock); - CloseHandle (event); + + return was_running; } /* FIXME: return success or failure */ void -threaded_queue::add (queue_request * therequest) +threaded_queue::add (queue_request *const therequest) { - /* safe to not "Try" because workers don't hog this, they wait on the event - */ - EnterCriticalSection (&queuelock); - if (!running) + assert (this); + assert (therequest); + assert (!therequest->_next); + + if (!_workers_count) { - system_printf ("No worker threads to handle request!"); + system_printf ("warning: no worker threads to handle request!"); + // FIXME: And then what? } - if (!request) - request = therequest; + + EnterCriticalSection (&_queue_lock); + if (!_requests_head) + _requests_head = therequest; else { - /* add to the queue end. */ - queue_request *listrequest = request; - while (listrequest->next) - listrequest = listrequest->next; - listrequest->next = therequest; + /* Add to the queue end. */ + queue_request *reqptr = _requests_head; + for (; reqptr->_next; reqptr = reqptr->_next) + {} + assert (reqptr); + assert (!reqptr->_next); + reqptr->_next = therequest; } - PulseEvent (event); - LeaveCriticalSection (&queuelock); + + _requests_count += 1; + assert (_requests_count > 0); + LeaveCriticalSection (&_queue_lock); + + (void) ReleaseSemaphore (_requests_sem, 1, NULL); } -/* FIXME: return success or failure rather than quitting */ +/*static*/ DWORD WINAPI +threaded_queue::start_routine (const LPVOID lpParam) +{ + class threaded_queue *const queue = (class threaded_queue *) lpParam; + assert (queue); + + queue->worker_loop (); + + const long count = InterlockedDecrement (&queue->_workers_count); + assert (count >= 0); + + if (queue->_running) + debug_printf ("worker loop has exited; thread about to terminate"); + + return 0; +} + +/* Called from the constructor: so no need to be thread-safe until the + * worker threads start to be created; thus the interlocked increment + * of the `_workers_count' field. + */ + void -threaded_queue::process_requests (queue_process_param * params, - threaded_queue_thread_function * - request_loop) +threaded_queue::create_workers (const size_t initial_workers) { - if (params->start (request_loop, this) == false) - exit (1); - params->next = - (queue_process_param *) InterlockedExchangePointer (&process_head, - params); + for (unsigned int i = 0; i != initial_workers; i++) + { + const long count = InterlockedIncrement (&_workers_count); + assert (count > 0); + + DWORD tid; + const HANDLE hThread = + CreateThread (NULL, 0, start_routine, this, 0, &tid); + + if (!hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } + + (void) CloseHandle (hThread); + } } -/* queue_process_param */ -/* How does a constructor return an error? */ -queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), -interruptible -(ninterruptible) +void +threaded_queue::worker_loop () { - if (!interruptible) - return; - debug_printf ("creating an interruptible processing thread"); - if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + while (true) { - system_printf ("Failed to create interrupt event (%lu), terminating", - GetLastError ()); - exit (1); + const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE); + if (rc == WAIT_FAILED) + { + system_printf ("wait for request semaphore failed, error = %lu", + GetLastError ()); + return; + } + assert (rc == WAIT_OBJECT_0); + + EnterCriticalSection (&_queue_lock); + if (!_running) + { + LeaveCriticalSection (&_queue_lock); + return; + } + + assert (_requests_head); + queue_request *const reqptr = _requests_head; + _requests_head = reqptr->_next; + + _requests_count -= 1; + assert (_requests_count >= 0); + LeaveCriticalSection (&_queue_lock); + + assert (reqptr); + reqptr->process (); + delete reqptr; } } -queue_process_param::~queue_process_param () +/*****************************************************************************/ + +/* queue_submission_loop */ + +queue_submission_loop::queue_submission_loop (threaded_queue *const queue, + const bool ninterruptible) + : _running (false), + _interrupt_event (NULL), + _queue (queue), + _interruptible (ninterruptible), + _hThread (NULL), + _tid (0), + _next (NULL) { - if (running) + if (_interruptible) + { + // verbose: debug_printf ("creating an interruptible processing thread"); + + _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES + FALSE, // Auto-reset + FALSE, // Initially non-signalled + NULL); // Anonymous + + if (!_interrupt_event) + { + system_printf ("failed to create interrupt event, error = %lu", + GetLastError ()); + abort (); + } + } +} + +queue_submission_loop::~queue_submission_loop () +{ + if (_running) stop (); - if (!interruptible) - return; - CloseHandle (interrupt); + if (_interrupt_event) + (void) CloseHandle (_interrupt_event); + if (_hThread) + (void) CloseHandle (_hThread); } bool - queue_process_param::start (threaded_queue_thread_function * request_loop, - threaded_queue * thequeue) +queue_submission_loop::start () { - queue = thequeue; - hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); - if (hThread) + assert (this); + assert (!_hThread); + + const bool was_running = _running; + + if (!was_running) { - running = true; - return true; + _running = true; + + _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid); + if (!_hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } } - system_printf ("Failed to create thread (%lu), terminating", - GetLastError ()); - return false; + + return was_running; } -void -queue_process_param::stop () +bool +queue_submission_loop::stop () { - if (interruptible) + assert (this); + assert (_hThread && _hThread != INVALID_HANDLE_VALUE); + + const bool was_running = _running; + + if (_running) { - InterlockedExchange (&shutdown, true); - PulseEvent (interrupt); - /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get - * scheduled again, we print an error and exit. We _should_ loop or - * try resignalling. We don't want to hand here though... - */ - int n = 5; - while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); - if (!n) + _running = false; + + if (_interruptible) { - system_printf ("Process thread didn't shutdown cleanly after 200ms!"); - exit (1); + assert (_interrupt_event + && _interrupt_event != INVALID_HANDLE_VALUE); + + SetEvent (_interrupt_event); + + if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT) + { + system_printf (("request loop thread %lu failed to shutdown " + "when asked politely: about to get heavy"), + _tid); + + if (!TerminateThread (_hThread, 0)) + { + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); + abort (); + } + } } else - running = false; - } - else - { - debug_printf ("killing request loop thread %ld", tid); - int rc; - if (!(rc = TerminateThread (hThread, 0))) { - system_printf ("error shutting down request loop worker thread"); + // FIXME: could wait to see if the request loop notices that + // the submission loop is no longer running and shuts down + // voluntarily. + + debug_printf ("killing request loop thread %lu", _tid); + + if (!TerminateThread (_hThread, 0)) + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); } - running = false; } - CloseHandle (hThread); + + return was_running; } -/* queue_request */ -queue_request::queue_request () - : next (NULL) -{} +/*static*/ DWORD WINAPI +queue_submission_loop::start_routine (const LPVOID lpParam) +{ + class queue_submission_loop *const submission_loop = + (class queue_submission_loop *) lpParam; + assert (submission_loop); -queue_request::~queue_request () -{} + submission_loop->request_loop (); + + debug_printf ("submission loop has exited; thread about to terminate"); + + submission_loop->stop (); + + return 0; +} + +/*****************************************************************************/ diff --git a/winsup/cygwin/ChangeLog b/winsup/cygwin/ChangeLog index de19a76a3c6..03f4dece3de 100644 --- a/winsup/cygwin/ChangeLog +++ b/winsup/cygwin/ChangeLog @@ -1,3 +1,48 @@ +2002-06-28 Conrad Scott <conrad.scott@dsl.pipex.com> + + * threaded_queue.h (class queue_request): Re-write. + (threaded_queue_thread_function): Remove. + (class queue_process_param): Remove. + (class threaded_queue): Re-write. + (class queue_submission_loop): New version of the old + `queue_process_param' class. + (TInterlockedExchangePointer): New templated function. + (TInterlockedCompareExchangePointer): Ditto. + * threaded_queue.cc (worker_function): Remove. + (class threaded_queue): Re-write. + (class queue_process_param): Remove. + (class queue_submission_loop): New version of the old + `queue_process_param' class. + * include/cygwin/cygserver_process.h (process_cleanup): Re-write. + (class process_process_param): Remove. + (class cleanup_routine): Re-write. + (class process): Re-write. + (class process_cache): Re-write. + * cygserver_process.cc (process_cleanup): Re-write. + (class process_process_param): Remove. + (class cleanup_routine): Re-write. + (class process): Re-write. + (class process_cache): Re-write. + * cygserver.cc (request_count): Remove unused variable. + (class server_request): Move methods inline. + (class server_process_param): Remove. + (class server_request_queue): Remove. + (request_queue): Move into `main ()' and change type to + `threaded_queue'. + (request_loop): Remove. + (class server_submission_loop): New version of the old + `server_process_param' class. + (shutdown_server): New variable. + (client_request_shutdown::serve): Set `shutdown_server' to trigger + shutdown. + (handle_signal): Ditto. + (main): Install signal handler for SIGINT rather than SIGQUIT. + Use new interfaces for the `request_queue' and the `cache'. + Create a `server_submission_loop' and add to the `request_queue'. + Add check for the `shutdown_server' variable to the main loop. + * cygserver_shm.cc (client_request_shm::serve): Release the + process object after use. + 2002-06-27 Conrad Scott <conrad.scott@dsl.pipex.com> * cygserver_client.cc (client_request::handle_request): Correct diff --git a/winsup/cygwin/cygserver.cc b/winsup/cygwin/cygserver.cc index 32e93dc47c8..9baf9030395 100755 --- a/winsup/cygwin/cygserver.cc +++ b/winsup/cygwin/cygserver.cc @@ -12,14 +12,12 @@ details. */ #include "woutsup.h" -#include <sys/socket.h> #include <sys/types.h> #include <assert.h> #include <ctype.h> #include <errno.h> #include <getopt.h> -#include <netdb.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> @@ -35,10 +33,6 @@ details. */ #include "cygwin/cygserver_process.h" #include "cygwin/cygserver_transport.h" -GENERIC_MAPPING access_mapping; - -DWORD request_count = 0; - // Version string. static const char version[] = "$Revision$"; @@ -127,6 +121,8 @@ __set_errno (const char *func, int ln, int val) #endif /* DEBUGGING */ +GENERIC_MAPPING access_mapping; + static BOOL setup_privileges () { @@ -257,7 +253,7 @@ check_and_dup_handle (HANDLE from_process, HANDLE to_process, */ void -client_request_attach_tty::serve (transport_layer_base * const conn, +client_request_attach_tty::serve (transport_layer_base *const conn, process_cache *) { assert (conn); @@ -283,9 +279,9 @@ client_request_attach_tty::serve (transport_layer_base * const conn, msglen (0); // Until we fill in some fields. - // verbose: debug_printf ("pid %ld:(%p,%p) -> pid %ld", req.master_pid, - // from_master, to_master, - // req.pid); + // verbose: debug_printf ("pid %ld:(%p,%p) -> pid %ld", + // req.master_pid, req.from_master, req.to_master, + // req.pid); // verbose: debug_printf ("opening process %ld", req.master_pid); @@ -401,66 +397,68 @@ client_request_get_version::serve(transport_layer_base *, process_cache *) class server_request : public queue_request { - public: - server_request (transport_layer_base *newconn, - class process_cache *newcache); - virtual ~server_request(); - virtual void process (); - private: - transport_layer_base *conn; - class process_cache *cache; -}; +public: + server_request (transport_layer_base *const conn, process_cache *const cache) + : _conn (conn), _cache (cache) + {} -class server_process_param : public queue_process_param -{ - public: - transport_layer_base *transport; - server_process_param () : queue_process_param (false) {}; -}; + virtual ~server_request() + { + delete _conn; + } -class server_request_queue : public threaded_queue -{ - public: - class process_cache *cache; - void process_requests (transport_layer_base *transport); - void add_connection (transport_layer_base *conn); -}; + virtual void process () + { + client_request::handle_request (_conn, _cache); + } -class server_request_queue request_queue; +private: + transport_layer_base *const _conn; + process_cache *const _cache; +}; -static DWORD WINAPI -request_loop (LPVOID LpParam) +class server_submission_loop : public queue_submission_loop { - class server_process_param *params = (server_process_param *) LpParam; - class server_request_queue *queue = (server_request_queue *) params->queue; - class transport_layer_base * transport = params->transport; - while (queue->active) +public: + server_submission_loop (threaded_queue *const queue, + transport_layer_base *const transport, + process_cache *const cache) + : queue_submission_loop (queue, false), + _transport (transport), + _cache (cache) { - bool recoverable = false; - transport_layer_base * const new_conn = transport->accept (&recoverable); - if (!new_conn && !recoverable) - { - system_printf ("fatal error on IPC transport: closing down"); - queue->active = false; - } - /* FIXME: this is a little ugly. What we really want is to wait on two objects: - * one for the pipe/socket, and one for being told to shutdown. Otherwise - * this will stay a problem (we won't actually shutdown until the request - * _AFTER_ the shutdown request. And sending ourselves a request is ugly - */ - if (new_conn && queue->active) - queue->add_connection (new_conn); + assert (_transport); + assert (_cache); } - return 0; -} -/* TODO: check we are not being asked to service a already serviced transport */ +private: + transport_layer_base *const _transport; + process_cache *const _cache; + + virtual void request_loop (); +}; + +/* FIXME: this is a little ugly. What we really want is to wait on + * two objects: one for the pipe/socket, and one for being told to + * shutdown. Otherwise this will stay a problem (we won't actually + * shutdown until the request _AFTER_ the shutdown request. And + * sending ourselves a request is ugly + */ void -server_request_queue::process_requests (transport_layer_base *transport) +server_submission_loop::request_loop () { - class server_process_param *params = new server_process_param; - params->transport = transport; - threaded_queue::process_requests (params, request_loop); + while (_running) + { + bool recoverable = false; + transport_layer_base *const conn = _transport->accept (&recoverable); + if (!conn && !recoverable) + { + system_printf ("fatal error on IPC transport: closing down"); + return; + } + if (conn) + _queue->add (new server_request (conn, _cache)); + } } client_request_shutdown::client_request_shutdown () @@ -469,6 +467,8 @@ client_request_shutdown::client_request_shutdown () syscall_printf ("created"); } +static volatile sig_atomic_t shutdown_server = false; + void client_request_shutdown::serve (transport_layer_base *, process_cache *) { @@ -480,55 +480,18 @@ client_request_shutdown::serve (transport_layer_base *, process_cache *) /* FIXME: link upwards, and then this becomes a trivial method call to * only shutdown _this queue_ */ - /* tell the main thread to shutdown */ - request_queue.active = false; - - msglen (0); -} - -server_request::server_request (transport_layer_base *newconn, - class process_cache *newcache) - : conn(newconn), cache(newcache) -{} - -server_request::~server_request () -{ - delete conn; -} -void -server_request::process () -{ - client_request::handle_request (conn, cache); -} + shutdown_server = true; -void -server_request_queue::add_connection (transport_layer_base *conn) -{ - /* safe to not "Try" because workers don't hog this, they wait on the event - */ - /* every derived ::add must enter the section! */ - EnterCriticalSection (&queuelock); - if (!running) - { - delete conn; - LeaveCriticalSection (&queuelock); - return; - } - queue_request * listrequest = new server_request (conn, cache); - add (listrequest); - LeaveCriticalSection (&queuelock); + msglen (0); } static void handle_signal (int signal) { /* any signal makes us die :} */ - /* FIXME: link upwards, and then this becomes a trivial method call to - * only shutdown _this queue_ - */ - /* tell the main thread to shutdown */ - request_queue.active=false; + + shutdown_server = true; } /* @@ -669,53 +632,66 @@ main (const int argc, char *argv[]) return 0; } - if (signal (SIGQUIT, handle_signal) == SIG_ERR) + if (signal (SIGINT, handle_signal) == SIG_ERR) { system_printf ("could not install signal handler (%d)- aborting startup", errno); exit (1); } - transport_layer_base * const transport = create_server_transport (); - assert (transport); print_version (pgm); setbuf (stdout, NULL); printf ("daemon starting up"); - transport->listen (); + + threaded_queue request_queue (10); printf ("."); - class process_cache cache (2); - request_queue.initial_workers = 10; - request_queue.cache = &cache; - request_queue.create_workers (); + + transport_layer_base *const transport = create_server_transport (); + assert (transport); printf ("."); - request_queue.process_requests (transport); + + process_cache cache (2); printf ("."); - cache.create_workers (); + + server_submission_loop submission_loop (&request_queue, transport, &cache); + printf ("."); + + request_queue.add_submission_loop (&submission_loop); printf ("."); - cache.process_requests (); - printf (".complete\n"); - /* TODO: wait on multiple objects - the thread handle for each request loop + - * all the process handles. This should be done by querying the request_queue and - * the process cache for all their handles, and then waiting for (say) 30 seconds. - * after that we recreate the list of handles to wait on, and wait again. - * the point of all this abstraction is that we can trivially server both sockets - * and pipes simply by making a new transport, and then calling - * request_queue.process_requests (transport2); + + transport->listen (); + printf ("."); + + cache.start (); + printf ("."); + + request_queue.start (); + printf ("."); + + printf ("complete\n"); + + /* TODO: wait on multiple objects - the thread handle for each + * request loop + all the process handles. This should be done by + * querying the request_queue and the process cache for all their + * handles, and then waiting for (say) 30 seconds. after that we + * recreate the list of handles to wait on, and wait again. the + * point of all this abstraction is that we can trivially server + * both sockets and pipes simply by making a new transport, and then + * calling request_queue.process_requests (transport2); */ /* WaitForMultipleObjects abort && request_queue && process_queue && signal -- if signal event then retrigger it */ - while (1 && request_queue.active) - { - sleep (1); - } + while (!shutdown_server && request_queue.running () && cache.running ()) + sleep (1); + printf ("\nShutdown request received - new requests will be denied\n"); - request_queue.cleanup (); + request_queue.stop (); printf ("All pending requests processed\n"); delete transport; printf ("No longer accepting requests - cygwin will operate in daemonless mode\n"); - cache.cleanup (); + cache.stop (); printf ("All outstanding process-cache activities completed\n"); printf ("daemon shutdown\n"); diff --git a/winsup/cygwin/cygserver_process.cc b/winsup/cygwin/cygserver_process.cc index 4ad80ed46b1..5f009406451 100755 --- a/winsup/cygwin/cygserver_process.cc +++ b/winsup/cygwin/cygserver_process.cc @@ -4,387 +4,395 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #include "woutsup.h" -#include <sys/socket.h> #include <sys/types.h> +#include <assert.h> #include <errno.h> -#include <netdb.h> #include <pthread.h> -#include <stdio.h> #include <stdlib.h> -#include <unistd.h> + +#include "cygerrno.h" #include "cygwin/cygserver_process.h" -/* the cache structures and classes are designed for one cache per server process. - * To make multiple process caches, a redesign will be needed - */ +/*****************************************************************************/ + +#define elements(ARRAY) (sizeof (ARRAY) / sizeof (*ARRAY)) -/* process cache */ -process_cache::process_cache (unsigned int num_initial_workers): -head (NULL) +/*****************************************************************************/ + +process_cleanup::~process_cleanup () { - /* there can only be one */ - InitializeCriticalSection (&cache_write_access); - if ((cache_add_trigger = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) - { - system_printf ("Failed to create cache add trigger (%lu), terminating", - GetLastError ()); - exit (1); - } - initial_workers = num_initial_workers; + delete _process; } -process_cache::~process_cache () +void +process_cleanup::process () { + _process->cleanup (); } -class process * -process_cache::process (DWORD winpid) +/*****************************************************************************/ + +/* cleanup_routine */ +cleanup_routine::~cleanup_routine () { - class process *entry = head; - /* TODO: make this more granular, so a search doesn't involve the write lock */ - EnterCriticalSection (&cache_write_access); - if (!entry) +} + +/*****************************************************************************/ + +process::process (const DWORD winpid) + : _winpid (winpid), + _next (NULL), + _cleaning_up (false), + _routines_head (NULL), + _exit_status (STILL_ACTIVE), + _hProcess (NULL) +{ + _hProcess = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid); + if (!_hProcess) { - entry = new class process (winpid); - entry->next = - (class process *) InterlockedExchangePointer (&head, entry); - PulseEvent (cache_add_trigger); + system_printf ("unable to obtain handle for new cache process %lu", + winpid); + _hProcess = INVALID_HANDLE_VALUE; + _exit_status = 0; } else - { - while (entry->winpid != winpid && entry->next) - entry = entry->next; - if (entry->winpid != winpid) - { - class process *new_entry = new class process (winpid); - new_entry->next = - (class process *) InterlockedExchangePointer (&entry->next, - new_entry); - entry = new_entry; - PulseEvent (cache_add_trigger); - } - } - LeaveCriticalSection (&cache_write_access); - return entry; + debug_printf ("got handle %p for new cache process %lu", + _hProcess, _winpid); + InitializeCriticalSection (&_access); } -static DWORD WINAPI -request_loop (LPVOID LpParam) +process::~process () { - class process_process_param *params = (process_process_param *) LpParam; - return params->request_loop (); + DeleteCriticalSection (&_access); + (void) CloseHandle (_hProcess); } -void -process_cache::process_requests () +/* No need to be thread-safe as this is only ever called by + * process_cache::remove_process(). If it has to be made thread-safe + * later on, it should not use the `access' critical section as that + * is held by the client request handlers for an arbitrary length of + * time, i.e. while they do whatever processing is required for a + * client request. + */ +DWORD +process::exit_code () { - class process_process_param *params = new process_process_param; - threaded_queue::process_requests (params, request_loop); + if (_hProcess && _hProcess != INVALID_HANDLE_VALUE + && _exit_status == STILL_ACTIVE + && !GetExitCodeProcess (_hProcess, &_exit_status)) + { + system_printf ("failed to retrieve exit code (%lu)", GetLastError ()); + _hProcess = INVALID_HANDLE_VALUE; + } + return _exit_status; } -void -process_cache::add_task (class process * theprocess) +bool +process::add (cleanup_routine *const new_cleanup) { - /* safe to not "Try" because workers don't hog this, they wait on the event + if (_cleaning_up) + return false; + EnterCriticalSection (&_access); + /* Check that we didn't block with ::cleanup (). This rigmarole is + * to get around win9x's glaring missing TryEnterCriticalSection + * call which would be a whole lot easier. */ - /* every derived ::add must enter the section! */ - EnterCriticalSection (&queuelock); - queue_request *listrequest = new process_cleanup (theprocess); - add (listrequest); - LeaveCriticalSection (&queuelock); -} - -/* NOT fully MT SAFE: must be called by only one thread in a program */ -void -process_cache::remove_process (class process *theprocess) -{ - class process *entry = head; - /* unlink */ - EnterCriticalSection (&cache_write_access); - if (entry == theprocess) - { - entry = (class process *) InterlockedExchangePointer (&head, theprocess->next); - if (entry != theprocess) - { - system_printf ("Bug encountered, process cache corrupted"); - exit (1); - } - } - else + if (_cleaning_up) { - while (entry->next && entry->next != theprocess) - entry = entry->next; - class process *temp = (class process *) InterlockedExchangePointer (&entry->next, theprocess->next); - if (temp != theprocess) - { - system_printf ("Bug encountered, process cache corrupted"); - exit (1); - } + LeaveCriticalSection (&_access); + return false; } - LeaveCriticalSection (&cache_write_access); - /* Process any cleanup tasks */ - add_task (theprocess); + new_cleanup->_next = _routines_head; + _routines_head = new_cleanup; + LeaveCriticalSection (&_access); + return true; } -/* copy <= max_copy HANDLEs to dest[], starting at an offset into _our list_ of - * begin_at. (Ie begin_at = 5, the first copied handle is still written to dest[0] - * NOTE: Thread safe, but not thread guaranteed - a newly added process may be missed. - * Who cares - It'll get caught the next time. +/* This is single threaded. It's called after the process is removed + * from the cache, but inserts may be attemped by worker threads that + * have a pointer to it. */ -int -process_cache::handle_snapshot (HANDLE * hdest, class process ** edest, - ssize_t max_copy, int begin_at) +void +process::cleanup () { - /* TODO:? grab a delete-lock, to prevent deletes during this process ? */ - class process *entry = head; - int count = begin_at; - /* skip begin_at entries */ - while (entry && count) - { - if (entry->exit_code () == STILL_ACTIVE) - count--; - entry = entry->next; - } - /* hit the end of the list within begin_at entries */ - if (count) - return 0; - HANDLE *hto = hdest; - class process **eto = edest; - while (entry && count < max_copy) + EnterCriticalSection (&_access); + assert (!_cleaning_up); + InterlockedExchange (&_cleaning_up, true); + cleanup_routine *entry = _routines_head; + _routines_head = NULL; + LeaveCriticalSection (&_access); + + while (entry) { - /* hack */ - if (entry->exit_code () == STILL_ACTIVE) - { - *hto = entry->handle (); - *eto = entry; - count++; - hto++; - eto++; - } - entry = entry->next; + cleanup_routine *const ptr = entry; + entry = entry->_next; + ptr->cleanup (_winpid); + delete ptr; } - return count; } -/* process's */ -/* global process crit section */ -static CRITICAL_SECTION process_access; -static pthread_once_t process_init; +/*****************************************************************************/ void -do_process_init (void) +process_cache::submission_loop::request_loop () { - InitializeCriticalSection (&process_access); - /* we don't have a cache shutdown capability today */ + assert (this); + assert (_cache); + assert (_interrupt_event); + + while (_running) + _cache->wait_for_processes (_interrupt_event); } -process::process (DWORD winpid): -winpid (winpid), next (NULL), cleaning_up (0), head (NULL), _exit_status (STILL_ACTIVE) +/*****************************************************************************/ + +process_cache::process_cache (const unsigned int initial_workers) + : _queue (initial_workers), + _submitter (this, &_queue), // true == interruptible + _processes_count (0), + _processes_head (NULL), + _cache_add_trigger (NULL) { - pthread_once (&process_init, do_process_init); - EnterCriticalSection (&process_access); - thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid); - if (!thehandle) + /* there can only be one */ + InitializeCriticalSection (&_cache_write_access); + + _cache_add_trigger = CreateEvent (NULL, // SECURITY_ATTRIBUTES + FALSE, // Auto-reset + FALSE, // Initially non-signalled + NULL); // Anonymous + + if (!_cache_add_trigger) { - system_printf ("unable to obtain handle for new cache process %lu", winpid); - thehandle = INVALID_HANDLE_VALUE; + system_printf ("failed to create cache add trigger (%lu), terminating", + GetLastError ()); + abort (); } - debug_printf ("Got handle %p for new cache process %lu", thehandle, winpid); - InitializeCriticalSection (&access); - LeaveCriticalSection (&process_access); -} -process::~process () -{ - DeleteCriticalSection (&access); + _queue.add_submission_loop (&_submitter); } -HANDLE -process::handle () +process_cache::~process_cache () { -// DWORD exitstate = exit_code (); -// if (exitstate == STILL_ACTIVE) - return thehandle; - - /* FIXME: call the cleanup list ? */ - -// CloseHandle (thehandle); -// debug_printf ("Process id %ld has terminated, attempting to open a new handle", -// winpid); -// thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid); -// debug_printf ("Got handle %p when refreshing cache process %ld", thehandle, winpid); -// /* FIXME: what if OpenProcess fails ? */ -// if (thehandle) -// { -// _exit_status = STILL_ACTIVE; -// exit_code (); -// } -// else -// thehandle = INVALID_HANDLE_VALUE; -// return thehandle; + (void) CloseHandle (_cache_add_trigger); + DeleteCriticalSection (&_cache_write_access); } -DWORD process::exit_code () +/* This returns the process object to the caller already locked, that + * is, with the object's `access' critical region entered. Thus the + * caller must unlock the object when it's finished with it (via + * process::release ()). It must then not try to access the object + * afterwards, except by going through this routine again, as it may + * have been deleted once it has been unlocked. + */ +class process * +process_cache::process (const DWORD winpid) { - if (_exit_status != STILL_ACTIVE) - return _exit_status; - bool - err = GetExitCodeProcess (thehandle, &_exit_status); - if (!err) + /* TODO: make this more granular, so a search doesn't involve the + * write lock. + */ + EnterCriticalSection (&_cache_write_access); + class process *previous = NULL; + class process *entry = find (winpid, &previous); + + if (!entry) { - system_printf ("Failed to retrieve exit code (%ld)", GetLastError ()); - thehandle = INVALID_HANDLE_VALUE; - return _exit_status; + if (_processes_count + SPECIALS_COUNT >= MAXIMUM_WAIT_OBJECTS) + { + LeaveCriticalSection (&_cache_write_access); + system_printf (("process limit (%d processes) reached; " + "new connection refused"), + MAXIMUM_WAIT_OBJECTS - SPECIALS_COUNT); + set_errno (EAGAIN); + return NULL; + } + + entry = new class process (winpid); + if (entry->_exit_status != STILL_ACTIVE) + { + LeaveCriticalSection (&_cache_write_access); + delete entry; + set_errno (ESRCH); + return NULL; + } + + if (previous) + { + entry->_next = previous->_next; + previous->_next = entry; + } + else + { + entry->_next = _processes_head; + _processes_head = entry; + } + + _processes_count += 1; + SetEvent (_cache_add_trigger); } - else if (_exit_status == STILL_ACTIVE) - return _exit_status; - /* add new cleanup task etc etc ? */ - return _exit_status; + + EnterCriticalSection (&entry->_access); // To be released by the caller. + LeaveCriticalSection (&_cache_write_access); + assert (entry); + assert (entry->_winpid == winpid); + return entry; } -/* this is single threaded. It's called after the process is removed from the cache, - * but inserts may be attemped by worker threads that have a pointer to it */ void -process::cleanup () +process_cache::wait_for_processes (const HANDLE interrupt_event) { - /* Serialize this */ - EnterCriticalSection (&access); - InterlockedIncrement (&(long)cleaning_up); - class cleanup_routine *entry = head; - while (entry) + // Update `_wait_array' with handles of all current processes. + const size_t count = sync_wait_array (interrupt_event); + + debug_printf ("waiting on %u objects (out of %u processes)", + count, _processes_count); + + const DWORD rc = WaitForMultipleObjects (count, _wait_array, + FALSE, INFINITE); + + if (rc == WAIT_FAILED) + { + system_printf ("could not wait on the process handles, error = %lu", + GetLastError ()); + abort (); + } + + const size_t start = rc - WAIT_OBJECT_0; + + if (rc < WAIT_OBJECT_0 || start > count) { - class cleanup_routine *temp; - entry->cleanup (winpid); - temp = entry->next; - delete entry; - entry = temp; + system_printf (("unexpected return code %rc " + "from WaitForMultipleObjects: " + "expected [%u .. %u)"), + rc, WAIT_OBJECT_0, WAIT_OBJECT_0 + count); + abort (); } - LeaveCriticalSection (&access); + + // Tell all the processes, from the signalled point up, the bad news. + for (size_t index = start; index != count; index++) + if (_process_array[index]) + check_and_remove_process (index); } -bool -process::add_cleanup_routine (class cleanup_routine *new_cleanup) +/* + * process_cache::sync_wait_array () + * + * Fill-in the wait array with the handles that the cache needs to wait on. + * These handles are: + * - the process_process_param's interrupt event + * - the process_cache's cache_add_trigger event + * - the handle for each live process in the cache. + * + * Return value: the number of live handles in the array. + */ + +size_t +process_cache::sync_wait_array (const HANDLE interrupt_event) { - if (cleaning_up) - return false; - EnterCriticalSection (&access); - /* check that we didn't block with ::cleanup () - * This rigmarole is to get around win9x's glaring missing TryEnterCriticalSection call - * which would be a whole lot easier - */ - if (cleaning_up) + assert (this); + assert (_cache_add_trigger && _cache_add_trigger != INVALID_HANDLE_VALUE); + assert (interrupt_event && interrupt_event != INVALID_HANDLE_VALUE); + + EnterCriticalSection (&_cache_write_access); + + assert (_processes_count + SPECIALS_COUNT < elements (_wait_array)); + + size_t index = 0; + + for (class process *ptr = _processes_head; ptr; ptr = ptr->_next) { - LeaveCriticalSection (&access); - return false; + assert (ptr->_hProcess && ptr->_hProcess != INVALID_HANDLE_VALUE); + assert (ptr->_exit_status == STILL_ACTIVE); + + _wait_array[index] = ptr->handle (); + _process_array[index++] = ptr; + + assert (index <= elements (_wait_array)); } - new_cleanup->next = head; - head = new_cleanup; - LeaveCriticalSection (&access); - return true; + + /* Sorry for shouting, but THESE MUST BE ADDED AT THE END! */ + /* Well, not strictly `must', but it's more efficient if they are :-) */ + + _wait_array[index] = interrupt_event; + _process_array[index++] = NULL; + + _wait_array[index] = _cache_add_trigger; + _process_array[index++] = NULL; + + /* Phew, back to normal volume now. */ + + assert (index <= elements (_wait_array)); + + LeaveCriticalSection (&_cache_write_access); + + return index; } -/* process_cleanup */ void -process_cleanup::process () +process_cache::check_and_remove_process (const size_t index) { - theprocess->cleanup (); - delete theprocess; + assert (this); + assert (index < elements (_wait_array) - SPECIALS_COUNT); + + class process *const process = _process_array[index]; + + assert (process); + assert (process->handle () == _wait_array[index]); + + if (process->exit_code () == STILL_ACTIVE) + return; + + debug_printf ("process %lu has left the building ($? = %lu)", + process->_winpid, process->_exit_status); + + /* Unlink the process object from the process list. */ + + EnterCriticalSection (&_cache_write_access); + + class process *previous = NULL; + + const class process *const tmp = find (process->_winpid, &previous); + + assert (tmp == process); + assert (previous ? previous->_next == process : _processes_head == process); + + if (previous) + previous->_next = process->_next; + else + _processes_head = process->_next; + + _processes_count -= 1; + SetEvent (_cache_add_trigger); + LeaveCriticalSection (&_cache_write_access); + + /* Schedule any cleanup tasks for this process. */ + _queue.add (new process_cleanup (process)); } -/* process_process_param */ -DWORD -process_process_param::request_loop () +class process * +process_cache::find (const DWORD winpid, class process **previous) { - process_cache *cache = (process_cache *) queue; - /* always malloc one, so there is no special case in the loop */ - ssize_t HandlesSize = 2; - HANDLE *Handles = (HANDLE *) malloc (sizeof (HANDLE) * HandlesSize); - process **Entries = (process **) malloc (sizeof (LPVOID) * HandlesSize); - /* TODO: put [1] at the end as it will also get done if a process dies? */ - Handles[0] = interrupt; - Handles[1] = cache->cache_add_trigger; - while (cache->active && !shutdown) - { - int copied; - copied = -1; - int offset; - offset = 1; - int count; - count = 2; - while ((copied == HandlesSize - 2 - offset) || copied < 0) - { - /* we need more storage to cope with all the HANDLES */ - if (copied == HandlesSize - 2 - offset) - { - HANDLE *temp = (HANDLE *) realloc (Handles, - sizeof (HANDLE) * - HandlesSize + 10); - if (!temp) - { - system_printf - ("cannot allocate more storage for the handle array!"); - exit (1); - } - Handles = temp; - process **ptemp = (process **) realloc (Entries, - sizeof (LPVOID) * - HandlesSize + 10); - if (!ptemp) - { - system_printf - ("cannot allocate more storage for the handle array!"); - exit (1); - } - Entries = ptemp; - HandlesSize += 10; - } - offset += copied; - copied = - cache->handle_snapshot (&Handles[2], &Entries[2], - HandlesSize - 2 - offset, offset); - count += copied; - } - // verbose: debug_printf ("waiting on %u objects", count); - DWORD rc = WaitForMultipleObjects (count, Handles, FALSE, INFINITE); - if (rc == WAIT_FAILED) - { - system_printf ("Could not wait on the process handles (%ld)!", - GetLastError ()); - exit (1); - } - int objindex = rc - WAIT_OBJECT_0; - if (objindex > 1 && objindex < count) - { - debug_printf ("Process %ld has left the building", - Entries[objindex]->winpid); - /* fire off the termination routines */ - cache->remove_process (Entries[objindex]); - } - else if (objindex >= 0 && objindex < 2) - { - /* 0 is shutdown - do nothing */ - /* 1 is a cache add event - just rebuild the object list */ - } - else - { - system_printf - ("unexpected return code from WaitForMultiple objects in process_process_param::request_loop"); - } - } - running = false; - return 0; + if (previous) + *previous = NULL; + + for (class process *ptr = _processes_head; ptr; ptr = ptr->_next) + if (ptr->_winpid == winpid) + return ptr; + else if (ptr->_winpid > winpid) // The list is sorted by winpid. + return NULL; + else if (previous) + *previous = ptr; + + return NULL; } -/* cleanup_routine */ -cleanup_routine::~cleanup_routine () -{} +/*****************************************************************************/ diff --git a/winsup/cygwin/cygserver_shm.cc b/winsup/cygwin/cygserver_shm.cc index 64cfc11b630..0d30616b055 100755 --- a/winsup/cygwin/cygserver_shm.cc +++ b/winsup/cygwin/cygserver_shm.cc @@ -4,11 +4,11 @@ Originally written by Robert Collins <robert.collins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #include "woutsup.h" @@ -154,7 +154,11 @@ client_request_shm::serve (transport_layer_base * const conn, HANDLE token_handle = NULL; DWORD rc; - from_process_handle = cache->process (parameters.in.winpid)->handle (); + class process *const process = cache->process (parameters.in.winpid); + assert (process); + from_process_handle = process->handle (); + process->release (); + /* possible TODO: reduce the access on the handle before we use it */ /* Note that unless we do this, we don't need to call CloseHandle - it's kept open * by the process cache until the process terminates. diff --git a/winsup/cygwin/include/cygwin/cygserver_process.h b/winsup/cygwin/include/cygwin/cygserver_process.h index d1139272a3d..53da198af6c 100755 --- a/winsup/cygwin/include/cygwin/cygserver_process.h +++ b/winsup/cygwin/include/cygwin/cygserver_process.h @@ -13,73 +13,133 @@ details. */ #ifndef _CYGSERVER_PROCESS_ #define _CYGSERVER_PROCESS_ +#include <assert.h> + #include "threaded_queue.h" -class process_cleanup:public queue_request +class process_cleanup : public queue_request { public: + process_cleanup (class process *const theprocess) + : _process (theprocess) + { + assert (_process); + } + + virtual ~process_cleanup (); + virtual void process (); - process_cleanup (class process *nprocess) : theprocess (nprocess) {}; + private: - class process * theprocess; + class process *const _process; }; -class process_process_param:public queue_process_param -{ - class process_cache *cache; -public: - DWORD request_loop (); - process_process_param ():queue_process_param (true) {}; -}; +class process; class cleanup_routine { + friend class process; + public: - cleanup_routine () : next (NULL) {}; + cleanup_routine () : _next (NULL) {} virtual ~cleanup_routine (); - class cleanup_routine * next; + /* MUST BE SYNCHRONOUS */ virtual void cleanup (DWORD winpid) = 0; + +private: + cleanup_routine *_next; }; +class process_cache; + class process { + friend process_cache; + friend process_cleanup; + public: - HANDLE handle (); - DWORD winpid; process (DWORD winpid); ~process (); - DWORD exit_code (); - class process * next; - long refcount; - bool add_cleanup_routine (class cleanup_routine *); - void cleanup (); + + HANDLE handle () const { return _hProcess; } + + void hold () { EnterCriticalSection (&_access); } + void release () { LeaveCriticalSection (&_access); } + private: + const DWORD _winpid; + class process *_next; + long _cleaning_up; + cleanup_routine *_routines_head; + DWORD _exit_status; // Set in the constructor and in exit_code (). + HANDLE _hProcess; /* used to prevent races-on-delete */ - CRITICAL_SECTION access; - volatile long cleaning_up; - class cleanup_routine *head; - HANDLE thehandle; - DWORD _exit_status; + CRITICAL_SECTION _access; + + DWORD exit_code (); + bool add (cleanup_routine *); + void cleanup (); }; -class process_cache:public threaded_queue +class process_cache { + // Number of special (i.e., non-process) handles in _wait_array. + // See wait_for_processes () and sync_wait_array () for details. + enum { + SPECIALS_COUNT = 2 + }; + + class submission_loop : public queue_submission_loop + { + public: + submission_loop (process_cache *const cache, threaded_queue *const queue) + : queue_submission_loop (queue, true), + _cache (cache) + { + assert (_cache); + } + + private: + process_cache *const _cache; + + virtual void request_loop (); + }; + + friend submission_loop; + public: process_cache (unsigned int initial_workers); ~process_cache (); + class process *process (DWORD winpid); - /* remove a process from the cache */ - int handle_snapshot (HANDLE *, class process **, ssize_t, int); - void remove_process (class process *); - /* threaded_queue methods */ - void process_requests (); - HANDLE cache_add_trigger; + + bool running () const { return _queue.running (); } + + bool start () { return _queue.start (); } + bool stop () { return _queue.stop (); } private: - void add_task (class process *); - class process *head; - CRITICAL_SECTION cache_write_access; + threaded_queue _queue; + submission_loop _submitter; + + size_t _processes_count; + class process *_processes_head; // A list sorted by winpid. + + // Access to the _wait_array and related fields is not thread-safe, + // since they are used solely by wait_for_processes () and its callees. + + HANDLE _wait_array[MAXIMUM_WAIT_OBJECTS]; + class process *_process_array[MAXIMUM_WAIT_OBJECTS]; + + HANDLE _cache_add_trigger; // Actually both add and remove. + CRITICAL_SECTION _cache_write_access; // Actually both read and write access. + + void wait_for_processes (HANDLE interrupt); + size_t sync_wait_array (HANDLE interrupt); + void check_and_remove_process (const size_t index); + + class process *find (DWORD winpid, class process **previous = NULL); }; #endif /* _CYGSERVER_PROCESS_ */ diff --git a/winsup/cygwin/threaded_queue.cc b/winsup/cygwin/threaded_queue.cc index aab60263455..e3da7f74728 100755 --- a/winsup/cygwin/threaded_queue.cc +++ b/winsup/cygwin/threaded_queue.cc @@ -4,14 +4,15 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #include "woutsup.h" +#include <assert.h> #include <errno.h> #include <stdio.h> #include <unistd.h> @@ -19,226 +20,387 @@ #include <stdlib.h> #include "threaded_queue.h" +/*****************************************************************************/ + +/* queue_request */ + +queue_request::~queue_request () +{} + +/*****************************************************************************/ + /* threaded_queue */ -DWORD WINAPI -worker_function (LPVOID LpParam) +threaded_queue::threaded_queue (const size_t initial_workers) + : _workers_count (0), + _running (false), + _submitters_head (NULL), + _requests_count (0), + _requests_head (NULL), + _requests_sem (NULL) { - class threaded_queue *queue = (class threaded_queue *) LpParam; - class queue_request *request; - /* FIXME use a threadsafe pop instead for speed? */ - while (queue->active) + InitializeCriticalSection (&_queue_lock); + + // This semaphore's count is the number of requests on the queue. + // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS + // multiplied by max. threads per process (2028?), which is (a few) + // more requests than could ever be pending with the current design. + + _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES + 0, // Initial count + 129792, // Maximum count + NULL); // Anonymous + + if (!_requests_sem) { - EnterCriticalSection (&queue->queuelock); - while (!queue->request && queue->active) - { - LeaveCriticalSection (&queue->queuelock); - DWORD rc = WaitForSingleObject (queue->event, INFINITE); - if (rc == WAIT_FAILED) - { - system_printf ("Wait for event failed"); - queue->running--; - ExitThread (0); - } - EnterCriticalSection (&queue->queuelock); - } - if (!queue->active) - { - queue->running--; - LeaveCriticalSection (&queue->queuelock); - ExitThread (0); - } - /* not needed, but it is efficient */ - request = - (class queue_request *) InterlockedExchangePointer (&queue->request, - queue->request-> - next); - LeaveCriticalSection (&queue->queuelock); - request->process (); - delete request; + system_printf (("failed to create the request queue semaphore, " + "error = %lu"), + GetLastError ()); + abort (); } - queue->running--; - ExitThread (0); + + create_workers (initial_workers); } -void -threaded_queue::create_workers () +threaded_queue::~threaded_queue () { - InitializeCriticalSection (&queuelock); - if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + if (_running) + stop (); + + debug_printf ("deleting all pending queue requests"); + queue_request *reqptr = _requests_head; + while (reqptr) { - system_printf ("Failed to create event queue (%lu), terminating", - GetLastError ()); - exit (1); + queue_request *const ptr = reqptr; + reqptr = reqptr->_next; + delete ptr; } - active = true; - /* FIXME: Use a stack pair and create threads on the fly whenever - * we have to to service a request. - */ - for (unsigned int i = 0; i < initial_workers; i++) + DeleteCriticalSection (&_queue_lock); + if (_requests_sem) + (void) CloseHandle (_requests_sem); +} + +/* FIXME: return success or failure rather than quitting */ +void +threaded_queue::add_submission_loop (queue_submission_loop *const submitter) +{ + assert (this); + assert (submitter); + assert (submitter->_queue == this); + assert (!submitter->_next); + + submitter->_next = + TInterlockedExchangePointer (&_submitters_head, submitter); + + if (_running) + submitter->start (); +} + +bool +threaded_queue::start () +{ + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = true; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (!was_running) { - HANDLE hThread; - DWORD tid; - hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); - if (hThread == NULL) + debug_printf ("starting all queue submission loops"); + + while (loopptr) { - system_printf ("Failed to create thread (%lu), terminating", - GetLastError ()); - exit (1); + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->start (); } - CloseHandle (hThread); - running++; } + + return was_running; } -void -threaded_queue::cleanup () +bool +threaded_queue::stop () { - /* harvest the threads */ - active = false; - /* kill the request processing loops */ - queue_process_param *reqloop; - /* make sure we don't race with a incoming request creation */ - EnterCriticalSection (&queuelock); - reqloop = - (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); - while (reqloop) + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = false; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (was_running) { - queue_process_param *t = reqloop; - reqloop = reqloop->next; - delete t; + debug_printf ("stopping all queue submission loops"); + while (loopptr) + { + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->stop (); + } + + ReleaseSemaphore (_requests_sem, _workers_count, NULL); + while (_workers_count) + { + debug_printf (("waiting for worker threads to terminate: " + "%lu still running"), + _workers_count); + sleep (1); + } + debug_printf ("all worker threads have terminated"); } - LeaveCriticalSection (&queuelock); - if (!running) - return; - debug_printf ("Waiting for current queue threads to terminate"); - for (int n = running; n; n--) - PulseEvent (event); - while (running) - sleep (1); - DeleteCriticalSection (&queuelock); - CloseHandle (event); + + return was_running; } /* FIXME: return success or failure */ void -threaded_queue::add (queue_request * therequest) +threaded_queue::add (queue_request *const therequest) { - /* safe to not "Try" because workers don't hog this, they wait on the event - */ - EnterCriticalSection (&queuelock); - if (!running) + assert (this); + assert (therequest); + assert (!therequest->_next); + + if (!_workers_count) { - system_printf ("No worker threads to handle request!"); + system_printf ("warning: no worker threads to handle request!"); + // FIXME: And then what? } - if (!request) - request = therequest; + + EnterCriticalSection (&_queue_lock); + if (!_requests_head) + _requests_head = therequest; else { - /* add to the queue end. */ - queue_request *listrequest = request; - while (listrequest->next) - listrequest = listrequest->next; - listrequest->next = therequest; + /* Add to the queue end. */ + queue_request *reqptr = _requests_head; + for (; reqptr->_next; reqptr = reqptr->_next) + {} + assert (reqptr); + assert (!reqptr->_next); + reqptr->_next = therequest; } - PulseEvent (event); - LeaveCriticalSection (&queuelock); + + _requests_count += 1; + assert (_requests_count > 0); + LeaveCriticalSection (&_queue_lock); + + (void) ReleaseSemaphore (_requests_sem, 1, NULL); } -/* FIXME: return success or failure rather than quitting */ +/*static*/ DWORD WINAPI +threaded_queue::start_routine (const LPVOID lpParam) +{ + class threaded_queue *const queue = (class threaded_queue *) lpParam; + assert (queue); + + queue->worker_loop (); + + const long count = InterlockedDecrement (&queue->_workers_count); + assert (count >= 0); + + if (queue->_running) + debug_printf ("worker loop has exited; thread about to terminate"); + + return 0; +} + +/* Called from the constructor: so no need to be thread-safe until the + * worker threads start to be created; thus the interlocked increment + * of the `_workers_count' field. + */ + void -threaded_queue::process_requests (queue_process_param * params, - threaded_queue_thread_function * - request_loop) +threaded_queue::create_workers (const size_t initial_workers) { - if (params->start (request_loop, this) == false) - exit (1); - params->next = - (queue_process_param *) InterlockedExchangePointer (&process_head, - params); + for (unsigned int i = 0; i != initial_workers; i++) + { + const long count = InterlockedIncrement (&_workers_count); + assert (count > 0); + + DWORD tid; + const HANDLE hThread = + CreateThread (NULL, 0, start_routine, this, 0, &tid); + + if (!hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } + + (void) CloseHandle (hThread); + } } -/* queue_process_param */ -/* How does a constructor return an error? */ -queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), -interruptible -(ninterruptible) +void +threaded_queue::worker_loop () { - if (!interruptible) - return; - debug_printf ("creating an interruptible processing thread"); - if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + while (true) { - system_printf ("Failed to create interrupt event (%lu), terminating", - GetLastError ()); - exit (1); + const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE); + if (rc == WAIT_FAILED) + { + system_printf ("wait for request semaphore failed, error = %lu", + GetLastError ()); + return; + } + assert (rc == WAIT_OBJECT_0); + + EnterCriticalSection (&_queue_lock); + if (!_running) + { + LeaveCriticalSection (&_queue_lock); + return; + } + + assert (_requests_head); + queue_request *const reqptr = _requests_head; + _requests_head = reqptr->_next; + + _requests_count -= 1; + assert (_requests_count >= 0); + LeaveCriticalSection (&_queue_lock); + + assert (reqptr); + reqptr->process (); + delete reqptr; } } -queue_process_param::~queue_process_param () +/*****************************************************************************/ + +/* queue_submission_loop */ + +queue_submission_loop::queue_submission_loop (threaded_queue *const queue, + const bool ninterruptible) + : _running (false), + _interrupt_event (NULL), + _queue (queue), + _interruptible (ninterruptible), + _hThread (NULL), + _tid (0), + _next (NULL) { - if (running) + if (_interruptible) + { + // verbose: debug_printf ("creating an interruptible processing thread"); + + _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES + FALSE, // Auto-reset + FALSE, // Initially non-signalled + NULL); // Anonymous + + if (!_interrupt_event) + { + system_printf ("failed to create interrupt event, error = %lu", + GetLastError ()); + abort (); + } + } +} + +queue_submission_loop::~queue_submission_loop () +{ + if (_running) stop (); - if (!interruptible) - return; - CloseHandle (interrupt); + if (_interrupt_event) + (void) CloseHandle (_interrupt_event); + if (_hThread) + (void) CloseHandle (_hThread); } bool - queue_process_param::start (threaded_queue_thread_function * request_loop, - threaded_queue * thequeue) +queue_submission_loop::start () { - queue = thequeue; - hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); - if (hThread) + assert (this); + assert (!_hThread); + + const bool was_running = _running; + + if (!was_running) { - running = true; - return true; + _running = true; + + _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid); + if (!_hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } } - system_printf ("Failed to create thread (%lu), terminating", - GetLastError ()); - return false; + + return was_running; } -void -queue_process_param::stop () +bool +queue_submission_loop::stop () { - if (interruptible) + assert (this); + assert (_hThread && _hThread != INVALID_HANDLE_VALUE); + + const bool was_running = _running; + + if (_running) { - InterlockedExchange (&shutdown, true); - PulseEvent (interrupt); - /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get - * scheduled again, we print an error and exit. We _should_ loop or - * try resignalling. We don't want to hand here though... - */ - int n = 5; - while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); - if (!n) + _running = false; + + if (_interruptible) { - system_printf ("Process thread didn't shutdown cleanly after 200ms!"); - exit (1); + assert (_interrupt_event + && _interrupt_event != INVALID_HANDLE_VALUE); + + SetEvent (_interrupt_event); + + if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT) + { + system_printf (("request loop thread %lu failed to shutdown " + "when asked politely: about to get heavy"), + _tid); + + if (!TerminateThread (_hThread, 0)) + { + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); + abort (); + } + } } else - running = false; - } - else - { - debug_printf ("killing request loop thread %ld", tid); - int rc; - if (!(rc = TerminateThread (hThread, 0))) { - system_printf ("error shutting down request loop worker thread"); + // FIXME: could wait to see if the request loop notices that + // the submission loop is no longer running and shuts down + // voluntarily. + + debug_printf ("killing request loop thread %lu", _tid); + + if (!TerminateThread (_hThread, 0)) + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); } - running = false; } - CloseHandle (hThread); + + return was_running; } -/* queue_request */ -queue_request::queue_request () - : next (NULL) -{} +/*static*/ DWORD WINAPI +queue_submission_loop::start_routine (const LPVOID lpParam) +{ + class queue_submission_loop *const submission_loop = + (class queue_submission_loop *) lpParam; + assert (submission_loop); -queue_request::~queue_request () -{} + submission_loop->request_loop (); + + debug_printf ("submission loop has exited; thread about to terminate"); + + submission_loop->stop (); + + return 0; +} + +/*****************************************************************************/ diff --git a/winsup/cygwin/threaded_queue.h b/winsup/cygwin/threaded_queue.h index d3c5c4cca93..a44a4b7273e 100755 --- a/winsup/cygwin/threaded_queue.h +++ b/winsup/cygwin/threaded_queue.h @@ -4,65 +4,124 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #ifndef _THREADED_QUEUE_ #define _THREADED_QUEUE_ +/*****************************************************************************/ + /* a specific request */ class queue_request { - public: - class queue_request *next; - virtual void process () = 0; - queue_request(); - virtual ~queue_request(); +public: + queue_request *_next; + + queue_request() : _next (NULL) {} + virtual ~queue_request(); + + virtual void process () = 0; }; +/*****************************************************************************/ -typedef DWORD WINAPI threaded_queue_thread_function (LPVOID); -/* parameters for a request finding and submitting loop */ +/* a queue to allocate requests from n submission loops to x worker threads */ -class queue_process_param +class queue_submission_loop; + +class threaded_queue { - public: - bool start (threaded_queue_thread_function *, class threaded_queue *); - void stop (); - bool running; - long int shutdown; - class queue_process_param * next; - class threaded_queue *queue; - queue_process_param (bool ninterruptible); - ~queue_process_param (); - bool interruptible; - HANDLE interrupt; - HANDLE hThread; - DWORD tid; +public: + threaded_queue (size_t initial_workers = 1); + ~threaded_queue (); + + void add_submission_loop (queue_submission_loop *); + + bool running () const { return _running; } + + bool start (); + bool stop (); + + void add (queue_request *); + +private: + long _workers_count; + bool _running; + + queue_submission_loop *_submitters_head; + + long _requests_count; // Informational only. + queue_request *_requests_head; + + CRITICAL_SECTION _queue_lock; + HANDLE _requests_sem; // == _requests_count + + static DWORD WINAPI start_routine (LPVOID /* this */); + + void create_workers (size_t initial_workers); + void worker_loop (); }; -/* a queue to allocate requests from n submission loops to x worker threads */ +/*****************************************************************************/ -class threaded_queue +/* parameters for a request finding and submitting loop */ + +class queue_submission_loop { - public: - CRITICAL_SECTION queuelock; - HANDLE event; - bool active; - queue_request * request; - unsigned int initial_workers; - unsigned int running; - void create_workers (); - void cleanup (); - void add (queue_request *); - void process_requests (queue_process_param *, threaded_queue_thread_function *); - threaded_queue () : active (false), request (NULL), initial_workers (1), running (0), process_head (NULL) {}; - private: - queue_request *process_head; + friend threaded_queue; + +public: + queue_submission_loop (threaded_queue *, bool ninterruptible); + virtual ~queue_submission_loop (); + + bool start (); + bool stop (); + + threaded_queue *queue () { return _queue; }; + +protected: + bool _running; + HANDLE _interrupt_event; + threaded_queue *const _queue; + +private: + bool _interruptible; + HANDLE _hThread; + DWORD _tid; + queue_submission_loop *_next; + + static DWORD WINAPI start_routine (LPVOID /* this */); + virtual void request_loop () = 0; }; +#ifdef __cplusplus + +/*---------------------------------------------------------------------------* + * Some type-safe versions of the various interlocked functions. + *---------------------------------------------------------------------------*/ + +template <typename T> T * +TInterlockedExchangePointer (T **lvalue, T *rvalue) +{ + return reinterpret_cast<T *> + (InterlockedExchangePointer (reinterpret_cast<void **> (lvalue), + reinterpret_cast<void *> (rvalue))); +} + +template <typename T> T * +TInterlockedCompareExchangePointer (T **lvalue, T *rvalue1, T *rvalue2) +{ + return reinterpret_cast<T *> + (InterlockedCompareExchangePointer (reinterpret_cast<void **> (lvalue), + reinterpret_cast<void *> (rvalue1), + reinterpret_cast<void *> (rvalue2))); +} + +#endif /* __cplusplus */ + #endif /* _THREADED_QUEUE_ */ |