summaryrefslogtreecommitdiff
path: root/winsup
diff options
context:
space:
mode:
Diffstat (limited to 'winsup')
-rw-r--r--winsup/cygserver/threaded_queue.cc492
-rw-r--r--winsup/cygwin/ChangeLog45
-rwxr-xr-xwinsup/cygwin/cygserver.cc228
-rwxr-xr-xwinsup/cygwin/cygserver_process.cc616
-rwxr-xr-xwinsup/cygwin/cygserver_shm.cc14
-rwxr-xr-xwinsup/cygwin/include/cygwin/cygserver_process.h128
-rwxr-xr-xwinsup/cygwin/threaded_queue.cc492
-rwxr-xr-xwinsup/cygwin/threaded_queue.h141
8 files changed, 1316 insertions, 840 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc
index aab60263455..e3da7f74728 100644
--- a/winsup/cygserver/threaded_queue.cc
+++ b/winsup/cygserver/threaded_queue.cc
@@ -4,14 +4,15 @@
Written by Robert Collins <rbtcollins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
#include "woutsup.h"
+#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
@@ -19,226 +20,387 @@
#include <stdlib.h>
#include "threaded_queue.h"
+/*****************************************************************************/
+
+/* queue_request */
+
+queue_request::~queue_request ()
+{}
+
+/*****************************************************************************/
+
/* threaded_queue */
-DWORD WINAPI
-worker_function (LPVOID LpParam)
+threaded_queue::threaded_queue (const size_t initial_workers)
+ : _workers_count (0),
+ _running (false),
+ _submitters_head (NULL),
+ _requests_count (0),
+ _requests_head (NULL),
+ _requests_sem (NULL)
{
- class threaded_queue *queue = (class threaded_queue *) LpParam;
- class queue_request *request;
- /* FIXME use a threadsafe pop instead for speed? */
- while (queue->active)
+ InitializeCriticalSection (&_queue_lock);
+
+ // This semaphore's count is the number of requests on the queue.
+ // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
+ // multiplied by max. threads per process (2028?), which is (a few)
+ // more requests than could ever be pending with the current design.
+
+ _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
+ 0, // Initial count
+ 129792, // Maximum count
+ NULL); // Anonymous
+
+ if (!_requests_sem)
{
- EnterCriticalSection (&queue->queuelock);
- while (!queue->request && queue->active)
- {
- LeaveCriticalSection (&queue->queuelock);
- DWORD rc = WaitForSingleObject (queue->event, INFINITE);
- if (rc == WAIT_FAILED)
- {
- system_printf ("Wait for event failed");
- queue->running--;
- ExitThread (0);
- }
- EnterCriticalSection (&queue->queuelock);
- }
- if (!queue->active)
- {
- queue->running--;
- LeaveCriticalSection (&queue->queuelock);
- ExitThread (0);
- }
- /* not needed, but it is efficient */
- request =
- (class queue_request *) InterlockedExchangePointer (&queue->request,
- queue->request->
- next);
- LeaveCriticalSection (&queue->queuelock);
- request->process ();
- delete request;
+ system_printf (("failed to create the request queue semaphore, "
+ "error = %lu"),
+ GetLastError ());
+ abort ();
}
- queue->running--;
- ExitThread (0);
+
+ create_workers (initial_workers);
}
-void
-threaded_queue::create_workers ()
+threaded_queue::~threaded_queue ()
{
- InitializeCriticalSection (&queuelock);
- if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ if (_running)
+ stop ();
+
+ debug_printf ("deleting all pending queue requests");
+ queue_request *reqptr = _requests_head;
+ while (reqptr)
{
- system_printf ("Failed to create event queue (%lu), terminating",
- GetLastError ());
- exit (1);
+ queue_request *const ptr = reqptr;
+ reqptr = reqptr->_next;
+ delete ptr;
}
- active = true;
- /* FIXME: Use a stack pair and create threads on the fly whenever
- * we have to to service a request.
- */
- for (unsigned int i = 0; i < initial_workers; i++)
+ DeleteCriticalSection (&_queue_lock);
+ if (_requests_sem)
+ (void) CloseHandle (_requests_sem);
+}
+
+/* FIXME: return success or failure rather than quitting */
+void
+threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
+{
+ assert (this);
+ assert (submitter);
+ assert (submitter->_queue == this);
+ assert (!submitter->_next);
+
+ submitter->_next =
+ TInterlockedExchangePointer (&_submitters_head, submitter);
+
+ if (_running)
+ submitter->start ();
+}
+
+bool
+threaded_queue::start ()
+{
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = true;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (!was_running)
{
- HANDLE hThread;
- DWORD tid;
- hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
- if (hThread == NULL)
+ debug_printf ("starting all queue submission loops");
+
+ while (loopptr)
{
- system_printf ("Failed to create thread (%lu), terminating",
- GetLastError ());
- exit (1);
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->start ();
}
- CloseHandle (hThread);
- running++;
}
+
+ return was_running;
}
-void
-threaded_queue::cleanup ()
+bool
+threaded_queue::stop ()
{
- /* harvest the threads */
- active = false;
- /* kill the request processing loops */
- queue_process_param *reqloop;
- /* make sure we don't race with a incoming request creation */
- EnterCriticalSection (&queuelock);
- reqloop =
- (queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
- while (reqloop)
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = false;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (was_running)
{
- queue_process_param *t = reqloop;
- reqloop = reqloop->next;
- delete t;
+ debug_printf ("stopping all queue submission loops");
+ while (loopptr)
+ {
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->stop ();
+ }
+
+ ReleaseSemaphore (_requests_sem, _workers_count, NULL);
+ while (_workers_count)
+ {
+ debug_printf (("waiting for worker threads to terminate: "
+ "%lu still running"),
+ _workers_count);
+ sleep (1);
+ }
+ debug_printf ("all worker threads have terminated");
}
- LeaveCriticalSection (&queuelock);
- if (!running)
- return;
- debug_printf ("Waiting for current queue threads to terminate");
- for (int n = running; n; n--)
- PulseEvent (event);
- while (running)
- sleep (1);
- DeleteCriticalSection (&queuelock);
- CloseHandle (event);
+
+ return was_running;
}
/* FIXME: return success or failure */
void
-threaded_queue::add (queue_request * therequest)
+threaded_queue::add (queue_request *const therequest)
{
- /* safe to not "Try" because workers don't hog this, they wait on the event
- */
- EnterCriticalSection (&queuelock);
- if (!running)
+ assert (this);
+ assert (therequest);
+ assert (!therequest->_next);
+
+ if (!_workers_count)
{
- system_printf ("No worker threads to handle request!");
+ system_printf ("warning: no worker threads to handle request!");
+ // FIXME: And then what?
}
- if (!request)
- request = therequest;
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_requests_head)
+ _requests_head = therequest;
else
{
- /* add to the queue end. */
- queue_request *listrequest = request;
- while (listrequest->next)
- listrequest = listrequest->next;
- listrequest->next = therequest;
+ /* Add to the queue end. */
+ queue_request *reqptr = _requests_head;
+ for (; reqptr->_next; reqptr = reqptr->_next)
+ {}
+ assert (reqptr);
+ assert (!reqptr->_next);
+ reqptr->_next = therequest;
}
- PulseEvent (event);
- LeaveCriticalSection (&queuelock);
+
+ _requests_count += 1;
+ assert (_requests_count > 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ (void) ReleaseSemaphore (_requests_sem, 1, NULL);
}
-/* FIXME: return success or failure rather than quitting */
+/*static*/ DWORD WINAPI
+threaded_queue::start_routine (const LPVOID lpParam)
+{
+ class threaded_queue *const queue = (class threaded_queue *) lpParam;
+ assert (queue);
+
+ queue->worker_loop ();
+
+ const long count = InterlockedDecrement (&queue->_workers_count);
+ assert (count >= 0);
+
+ if (queue->_running)
+ debug_printf ("worker loop has exited; thread about to terminate");
+
+ return 0;
+}
+
+/* Called from the constructor: so no need to be thread-safe until the
+ * worker threads start to be created; thus the interlocked increment
+ * of the `_workers_count' field.
+ */
+
void
-threaded_queue::process_requests (queue_process_param * params,
- threaded_queue_thread_function *
- request_loop)
+threaded_queue::create_workers (const size_t initial_workers)
{
- if (params->start (request_loop, this) == false)
- exit (1);
- params->next =
- (queue_process_param *) InterlockedExchangePointer (&process_head,
- params);
+ for (unsigned int i = 0; i != initial_workers; i++)
+ {
+ const long count = InterlockedIncrement (&_workers_count);
+ assert (count > 0);
+
+ DWORD tid;
+ const HANDLE hThread =
+ CreateThread (NULL, 0, start_routine, this, 0, &tid);
+
+ if (!hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+
+ (void) CloseHandle (hThread);
+ }
}
-/* queue_process_param */
-/* How does a constructor return an error? */
-queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
-interruptible
-(ninterruptible)
+void
+threaded_queue::worker_loop ()
{
- if (!interruptible)
- return;
- debug_printf ("creating an interruptible processing thread");
- if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ while (true)
{
- system_printf ("Failed to create interrupt event (%lu), terminating",
- GetLastError ());
- exit (1);
+ const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
+ if (rc == WAIT_FAILED)
+ {
+ system_printf ("wait for request semaphore failed, error = %lu",
+ GetLastError ());
+ return;
+ }
+ assert (rc == WAIT_OBJECT_0);
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_running)
+ {
+ LeaveCriticalSection (&_queue_lock);
+ return;
+ }
+
+ assert (_requests_head);
+ queue_request *const reqptr = _requests_head;
+ _requests_head = reqptr->_next;
+
+ _requests_count -= 1;
+ assert (_requests_count >= 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ assert (reqptr);
+ reqptr->process ();
+ delete reqptr;
}
}
-queue_process_param::~queue_process_param ()
+/*****************************************************************************/
+
+/* queue_submission_loop */
+
+queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
+ const bool ninterruptible)
+ : _running (false),
+ _interrupt_event (NULL),
+ _queue (queue),
+ _interruptible (ninterruptible),
+ _hThread (NULL),
+ _tid (0),
+ _next (NULL)
{
- if (running)
+ if (_interruptible)
+ {
+ // verbose: debug_printf ("creating an interruptible processing thread");
+
+ _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
+ FALSE, // Auto-reset
+ FALSE, // Initially non-signalled
+ NULL); // Anonymous
+
+ if (!_interrupt_event)
+ {
+ system_printf ("failed to create interrupt event, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+ }
+}
+
+queue_submission_loop::~queue_submission_loop ()
+{
+ if (_running)
stop ();
- if (!interruptible)
- return;
- CloseHandle (interrupt);
+ if (_interrupt_event)
+ (void) CloseHandle (_interrupt_event);
+ if (_hThread)
+ (void) CloseHandle (_hThread);
}
bool
- queue_process_param::start (threaded_queue_thread_function * request_loop,
- threaded_queue * thequeue)
+queue_submission_loop::start ()
{
- queue = thequeue;
- hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
- if (hThread)
+ assert (this);
+ assert (!_hThread);
+
+ const bool was_running = _running;
+
+ if (!was_running)
{
- running = true;
- return true;
+ _running = true;
+
+ _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
+ if (!_hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
}
- system_printf ("Failed to create thread (%lu), terminating",
- GetLastError ());
- return false;
+
+ return was_running;
}
-void
-queue_process_param::stop ()
+bool
+queue_submission_loop::stop ()
{
- if (interruptible)
+ assert (this);
+ assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
+
+ const bool was_running = _running;
+
+ if (_running)
{
- InterlockedExchange (&shutdown, true);
- PulseEvent (interrupt);
- /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
- * scheduled again, we print an error and exit. We _should_ loop or
- * try resignalling. We don't want to hand here though...
- */
- int n = 5;
- while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
- if (!n)
+ _running = false;
+
+ if (_interruptible)
{
- system_printf ("Process thread didn't shutdown cleanly after 200ms!");
- exit (1);
+ assert (_interrupt_event
+ && _interrupt_event != INVALID_HANDLE_VALUE);
+
+ SetEvent (_interrupt_event);
+
+ if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
+ {
+ system_printf (("request loop thread %lu failed to shutdown "
+ "when asked politely: about to get heavy"),
+ _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ {
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
+ abort ();
+ }
+ }
}
else
- running = false;
- }
- else
- {
- debug_printf ("killing request loop thread %ld", tid);
- int rc;
- if (!(rc = TerminateThread (hThread, 0)))
{
- system_printf ("error shutting down request loop worker thread");
+ // FIXME: could wait to see if the request loop notices that
+ // the submission loop is no longer running and shuts down
+ // voluntarily.
+
+ debug_printf ("killing request loop thread %lu", _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
}
- running = false;
}
- CloseHandle (hThread);
+
+ return was_running;
}
-/* queue_request */
-queue_request::queue_request ()
- : next (NULL)
-{}
+/*static*/ DWORD WINAPI
+queue_submission_loop::start_routine (const LPVOID lpParam)
+{
+ class queue_submission_loop *const submission_loop =
+ (class queue_submission_loop *) lpParam;
+ assert (submission_loop);
-queue_request::~queue_request ()
-{}
+ submission_loop->request_loop ();
+
+ debug_printf ("submission loop has exited; thread about to terminate");
+
+ submission_loop->stop ();
+
+ return 0;
+}
+
+/*****************************************************************************/
diff --git a/winsup/cygwin/ChangeLog b/winsup/cygwin/ChangeLog
index de19a76a3c6..03f4dece3de 100644
--- a/winsup/cygwin/ChangeLog
+++ b/winsup/cygwin/ChangeLog
@@ -1,3 +1,48 @@
+2002-06-28 Conrad Scott <conrad.scott@dsl.pipex.com>
+
+ * threaded_queue.h (class queue_request): Re-write.
+ (threaded_queue_thread_function): Remove.
+ (class queue_process_param): Remove.
+ (class threaded_queue): Re-write.
+ (class queue_submission_loop): New version of the old
+ `queue_process_param' class.
+ (TInterlockedExchangePointer): New templated function.
+ (TInterlockedCompareExchangePointer): Ditto.
+ * threaded_queue.cc (worker_function): Remove.
+ (class threaded_queue): Re-write.
+ (class queue_process_param): Remove.
+ (class queue_submission_loop): New version of the old
+ `queue_process_param' class.
+ * include/cygwin/cygserver_process.h (process_cleanup): Re-write.
+ (class process_process_param): Remove.
+ (class cleanup_routine): Re-write.
+ (class process): Re-write.
+ (class process_cache): Re-write.
+ * cygserver_process.cc (process_cleanup): Re-write.
+ (class process_process_param): Remove.
+ (class cleanup_routine): Re-write.
+ (class process): Re-write.
+ (class process_cache): Re-write.
+ * cygserver.cc (request_count): Remove unused variable.
+ (class server_request): Move methods inline.
+ (class server_process_param): Remove.
+ (class server_request_queue): Remove.
+ (request_queue): Move into `main ()' and change type to
+ `threaded_queue'.
+ (request_loop): Remove.
+ (class server_submission_loop): New version of the old
+ `server_process_param' class.
+ (shutdown_server): New variable.
+ (client_request_shutdown::serve): Set `shutdown_server' to trigger
+ shutdown.
+ (handle_signal): Ditto.
+ (main): Install signal handler for SIGINT rather than SIGQUIT.
+ Use new interfaces for the `request_queue' and the `cache'.
+ Create a `server_submission_loop' and add to the `request_queue'.
+ Add check for the `shutdown_server' variable to the main loop.
+ * cygserver_shm.cc (client_request_shm::serve): Release the
+ process object after use.
+
2002-06-27 Conrad Scott <conrad.scott@dsl.pipex.com>
* cygserver_client.cc (client_request::handle_request): Correct
diff --git a/winsup/cygwin/cygserver.cc b/winsup/cygwin/cygserver.cc
index 32e93dc47c8..9baf9030395 100755
--- a/winsup/cygwin/cygserver.cc
+++ b/winsup/cygwin/cygserver.cc
@@ -12,14 +12,12 @@ details. */
#include "woutsup.h"
-#include <sys/socket.h>
#include <sys/types.h>
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <getopt.h>
-#include <netdb.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
@@ -35,10 +33,6 @@ details. */
#include "cygwin/cygserver_process.h"
#include "cygwin/cygserver_transport.h"
-GENERIC_MAPPING access_mapping;
-
-DWORD request_count = 0;
-
// Version string.
static const char version[] = "$Revision$";
@@ -127,6 +121,8 @@ __set_errno (const char *func, int ln, int val)
#endif /* DEBUGGING */
+GENERIC_MAPPING access_mapping;
+
static BOOL
setup_privileges ()
{
@@ -257,7 +253,7 @@ check_and_dup_handle (HANDLE from_process, HANDLE to_process,
*/
void
-client_request_attach_tty::serve (transport_layer_base * const conn,
+client_request_attach_tty::serve (transport_layer_base *const conn,
process_cache *)
{
assert (conn);
@@ -283,9 +279,9 @@ client_request_attach_tty::serve (transport_layer_base * const conn,
msglen (0); // Until we fill in some fields.
- // verbose: debug_printf ("pid %ld:(%p,%p) -> pid %ld", req.master_pid,
- // from_master, to_master,
- // req.pid);
+ // verbose: debug_printf ("pid %ld:(%p,%p) -> pid %ld",
+ // req.master_pid, req.from_master, req.to_master,
+ // req.pid);
// verbose: debug_printf ("opening process %ld", req.master_pid);
@@ -401,66 +397,68 @@ client_request_get_version::serve(transport_layer_base *, process_cache *)
class server_request : public queue_request
{
- public:
- server_request (transport_layer_base *newconn,
- class process_cache *newcache);
- virtual ~server_request();
- virtual void process ();
- private:
- transport_layer_base *conn;
- class process_cache *cache;
-};
+public:
+ server_request (transport_layer_base *const conn, process_cache *const cache)
+ : _conn (conn), _cache (cache)
+ {}
-class server_process_param : public queue_process_param
-{
- public:
- transport_layer_base *transport;
- server_process_param () : queue_process_param (false) {};
-};
+ virtual ~server_request()
+ {
+ delete _conn;
+ }
-class server_request_queue : public threaded_queue
-{
- public:
- class process_cache *cache;
- void process_requests (transport_layer_base *transport);
- void add_connection (transport_layer_base *conn);
-};
+ virtual void process ()
+ {
+ client_request::handle_request (_conn, _cache);
+ }
-class server_request_queue request_queue;
+private:
+ transport_layer_base *const _conn;
+ process_cache *const _cache;
+};
-static DWORD WINAPI
-request_loop (LPVOID LpParam)
+class server_submission_loop : public queue_submission_loop
{
- class server_process_param *params = (server_process_param *) LpParam;
- class server_request_queue *queue = (server_request_queue *) params->queue;
- class transport_layer_base * transport = params->transport;
- while (queue->active)
+public:
+ server_submission_loop (threaded_queue *const queue,
+ transport_layer_base *const transport,
+ process_cache *const cache)
+ : queue_submission_loop (queue, false),
+ _transport (transport),
+ _cache (cache)
{
- bool recoverable = false;
- transport_layer_base * const new_conn = transport->accept (&recoverable);
- if (!new_conn && !recoverable)
- {
- system_printf ("fatal error on IPC transport: closing down");
- queue->active = false;
- }
- /* FIXME: this is a little ugly. What we really want is to wait on two objects:
- * one for the pipe/socket, and one for being told to shutdown. Otherwise
- * this will stay a problem (we won't actually shutdown until the request
- * _AFTER_ the shutdown request. And sending ourselves a request is ugly
- */
- if (new_conn && queue->active)
- queue->add_connection (new_conn);
+ assert (_transport);
+ assert (_cache);
}
- return 0;
-}
-/* TODO: check we are not being asked to service a already serviced transport */
+private:
+ transport_layer_base *const _transport;
+ process_cache *const _cache;
+
+ virtual void request_loop ();
+};
+
+/* FIXME: this is a little ugly. What we really want is to wait on
+ * two objects: one for the pipe/socket, and one for being told to
+ * shutdown. Otherwise this will stay a problem (we won't actually
+ * shutdown until the request _AFTER_ the shutdown request. And
+ * sending ourselves a request is ugly
+ */
void
-server_request_queue::process_requests (transport_layer_base *transport)
+server_submission_loop::request_loop ()
{
- class server_process_param *params = new server_process_param;
- params->transport = transport;
- threaded_queue::process_requests (params, request_loop);
+ while (_running)
+ {
+ bool recoverable = false;
+ transport_layer_base *const conn = _transport->accept (&recoverable);
+ if (!conn && !recoverable)
+ {
+ system_printf ("fatal error on IPC transport: closing down");
+ return;
+ }
+ if (conn)
+ _queue->add (new server_request (conn, _cache));
+ }
}
client_request_shutdown::client_request_shutdown ()
@@ -469,6 +467,8 @@ client_request_shutdown::client_request_shutdown ()
syscall_printf ("created");
}
+static volatile sig_atomic_t shutdown_server = false;
+
void
client_request_shutdown::serve (transport_layer_base *, process_cache *)
{
@@ -480,55 +480,18 @@ client_request_shutdown::serve (transport_layer_base *, process_cache *)
/* FIXME: link upwards, and then this becomes a trivial method call to
* only shutdown _this queue_
*/
- /* tell the main thread to shutdown */
- request_queue.active = false;
-
- msglen (0);
-}
-
-server_request::server_request (transport_layer_base *newconn,
- class process_cache *newcache)
- : conn(newconn), cache(newcache)
-{}
-
-server_request::~server_request ()
-{
- delete conn;
-}
-void
-server_request::process ()
-{
- client_request::handle_request (conn, cache);
-}
+ shutdown_server = true;
-void
-server_request_queue::add_connection (transport_layer_base *conn)
-{
- /* safe to not "Try" because workers don't hog this, they wait on the event
- */
- /* every derived ::add must enter the section! */
- EnterCriticalSection (&queuelock);
- if (!running)
- {
- delete conn;
- LeaveCriticalSection (&queuelock);
- return;
- }
- queue_request * listrequest = new server_request (conn, cache);
- add (listrequest);
- LeaveCriticalSection (&queuelock);
+ msglen (0);
}
static void
handle_signal (int signal)
{
/* any signal makes us die :} */
- /* FIXME: link upwards, and then this becomes a trivial method call to
- * only shutdown _this queue_
- */
- /* tell the main thread to shutdown */
- request_queue.active=false;
+
+ shutdown_server = true;
}
/*
@@ -669,53 +632,66 @@ main (const int argc, char *argv[])
return 0;
}
- if (signal (SIGQUIT, handle_signal) == SIG_ERR)
+ if (signal (SIGINT, handle_signal) == SIG_ERR)
{
system_printf ("could not install signal handler (%d)- aborting startup",
errno);
exit (1);
}
- transport_layer_base * const transport = create_server_transport ();
- assert (transport);
print_version (pgm);
setbuf (stdout, NULL);
printf ("daemon starting up");
- transport->listen ();
+
+ threaded_queue request_queue (10);
printf (".");
- class process_cache cache (2);
- request_queue.initial_workers = 10;
- request_queue.cache = &cache;
- request_queue.create_workers ();
+
+ transport_layer_base *const transport = create_server_transport ();
+ assert (transport);
printf (".");
- request_queue.process_requests (transport);
+
+ process_cache cache (2);
printf (".");
- cache.create_workers ();
+
+ server_submission_loop submission_loop (&request_queue, transport, &cache);
+ printf (".");
+
+ request_queue.add_submission_loop (&submission_loop);
printf (".");
- cache.process_requests ();
- printf (".complete\n");
- /* TODO: wait on multiple objects - the thread handle for each request loop +
- * all the process handles. This should be done by querying the request_queue and
- * the process cache for all their handles, and then waiting for (say) 30 seconds.
- * after that we recreate the list of handles to wait on, and wait again.
- * the point of all this abstraction is that we can trivially server both sockets
- * and pipes simply by making a new transport, and then calling
- * request_queue.process_requests (transport2);
+
+ transport->listen ();
+ printf (".");
+
+ cache.start ();
+ printf (".");
+
+ request_queue.start ();
+ printf (".");
+
+ printf ("complete\n");
+
+ /* TODO: wait on multiple objects - the thread handle for each
+ * request loop + all the process handles. This should be done by
+ * querying the request_queue and the process cache for all their
+ * handles, and then waiting for (say) 30 seconds. after that we
+ * recreate the list of handles to wait on, and wait again. the
+ * point of all this abstraction is that we can trivially server
+ * both sockets and pipes simply by making a new transport, and then
+ * calling request_queue.process_requests (transport2);
*/
/* WaitForMultipleObjects abort && request_queue && process_queue && signal
-- if signal event then retrigger it
*/
- while (1 && request_queue.active)
- {
- sleep (1);
- }
+ while (!shutdown_server && request_queue.running () && cache.running ())
+ sleep (1);
+
printf ("\nShutdown request received - new requests will be denied\n");
- request_queue.cleanup ();
+ request_queue.stop ();
printf ("All pending requests processed\n");
delete transport;
printf ("No longer accepting requests - cygwin will operate in daemonless mode\n");
- cache.cleanup ();
+ cache.stop ();
printf ("All outstanding process-cache activities completed\n");
printf ("daemon shutdown\n");
diff --git a/winsup/cygwin/cygserver_process.cc b/winsup/cygwin/cygserver_process.cc
index 4ad80ed46b1..5f009406451 100755
--- a/winsup/cygwin/cygserver_process.cc
+++ b/winsup/cygwin/cygserver_process.cc
@@ -4,387 +4,395 @@
Written by Robert Collins <rbtcollins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
#include "woutsup.h"
-#include <sys/socket.h>
#include <sys/types.h>
+#include <assert.h>
#include <errno.h>
-#include <netdb.h>
#include <pthread.h>
-#include <stdio.h>
#include <stdlib.h>
-#include <unistd.h>
+
+#include "cygerrno.h"
#include "cygwin/cygserver_process.h"
-/* the cache structures and classes are designed for one cache per server process.
- * To make multiple process caches, a redesign will be needed
- */
+/*****************************************************************************/
+
+#define elements(ARRAY) (sizeof (ARRAY) / sizeof (*ARRAY))
-/* process cache */
-process_cache::process_cache (unsigned int num_initial_workers):
-head (NULL)
+/*****************************************************************************/
+
+process_cleanup::~process_cleanup ()
{
- /* there can only be one */
- InitializeCriticalSection (&cache_write_access);
- if ((cache_add_trigger = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
- {
- system_printf ("Failed to create cache add trigger (%lu), terminating",
- GetLastError ());
- exit (1);
- }
- initial_workers = num_initial_workers;
+ delete _process;
}
-process_cache::~process_cache ()
+void
+process_cleanup::process ()
{
+ _process->cleanup ();
}
-class process *
-process_cache::process (DWORD winpid)
+/*****************************************************************************/
+
+/* cleanup_routine */
+cleanup_routine::~cleanup_routine ()
{
- class process *entry = head;
- /* TODO: make this more granular, so a search doesn't involve the write lock */
- EnterCriticalSection (&cache_write_access);
- if (!entry)
+}
+
+/*****************************************************************************/
+
+process::process (const DWORD winpid)
+ : _winpid (winpid),
+ _next (NULL),
+ _cleaning_up (false),
+ _routines_head (NULL),
+ _exit_status (STILL_ACTIVE),
+ _hProcess (NULL)
+{
+ _hProcess = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid);
+ if (!_hProcess)
{
- entry = new class process (winpid);
- entry->next =
- (class process *) InterlockedExchangePointer (&head, entry);
- PulseEvent (cache_add_trigger);
+ system_printf ("unable to obtain handle for new cache process %lu",
+ winpid);
+ _hProcess = INVALID_HANDLE_VALUE;
+ _exit_status = 0;
}
else
- {
- while (entry->winpid != winpid && entry->next)
- entry = entry->next;
- if (entry->winpid != winpid)
- {
- class process *new_entry = new class process (winpid);
- new_entry->next =
- (class process *) InterlockedExchangePointer (&entry->next,
- new_entry);
- entry = new_entry;
- PulseEvent (cache_add_trigger);
- }
- }
- LeaveCriticalSection (&cache_write_access);
- return entry;
+ debug_printf ("got handle %p for new cache process %lu",
+ _hProcess, _winpid);
+ InitializeCriticalSection (&_access);
}
-static DWORD WINAPI
-request_loop (LPVOID LpParam)
+process::~process ()
{
- class process_process_param *params = (process_process_param *) LpParam;
- return params->request_loop ();
+ DeleteCriticalSection (&_access);
+ (void) CloseHandle (_hProcess);
}
-void
-process_cache::process_requests ()
+/* No need to be thread-safe as this is only ever called by
+ * process_cache::remove_process(). If it has to be made thread-safe
+ * later on, it should not use the `access' critical section as that
+ * is held by the client request handlers for an arbitrary length of
+ * time, i.e. while they do whatever processing is required for a
+ * client request.
+ */
+DWORD
+process::exit_code ()
{
- class process_process_param *params = new process_process_param;
- threaded_queue::process_requests (params, request_loop);
+ if (_hProcess && _hProcess != INVALID_HANDLE_VALUE
+ && _exit_status == STILL_ACTIVE
+ && !GetExitCodeProcess (_hProcess, &_exit_status))
+ {
+ system_printf ("failed to retrieve exit code (%lu)", GetLastError ());
+ _hProcess = INVALID_HANDLE_VALUE;
+ }
+ return _exit_status;
}
-void
-process_cache::add_task (class process * theprocess)
+bool
+process::add (cleanup_routine *const new_cleanup)
{
- /* safe to not "Try" because workers don't hog this, they wait on the event
+ if (_cleaning_up)
+ return false;
+ EnterCriticalSection (&_access);
+ /* Check that we didn't block with ::cleanup (). This rigmarole is
+ * to get around win9x's glaring missing TryEnterCriticalSection
+ * call which would be a whole lot easier.
*/
- /* every derived ::add must enter the section! */
- EnterCriticalSection (&queuelock);
- queue_request *listrequest = new process_cleanup (theprocess);
- add (listrequest);
- LeaveCriticalSection (&queuelock);
-}
-
-/* NOT fully MT SAFE: must be called by only one thread in a program */
-void
-process_cache::remove_process (class process *theprocess)
-{
- class process *entry = head;
- /* unlink */
- EnterCriticalSection (&cache_write_access);
- if (entry == theprocess)
- {
- entry = (class process *) InterlockedExchangePointer (&head, theprocess->next);
- if (entry != theprocess)
- {
- system_printf ("Bug encountered, process cache corrupted");
- exit (1);
- }
- }
- else
+ if (_cleaning_up)
{
- while (entry->next && entry->next != theprocess)
- entry = entry->next;
- class process *temp = (class process *) InterlockedExchangePointer (&entry->next, theprocess->next);
- if (temp != theprocess)
- {
- system_printf ("Bug encountered, process cache corrupted");
- exit (1);
- }
+ LeaveCriticalSection (&_access);
+ return false;
}
- LeaveCriticalSection (&cache_write_access);
- /* Process any cleanup tasks */
- add_task (theprocess);
+ new_cleanup->_next = _routines_head;
+ _routines_head = new_cleanup;
+ LeaveCriticalSection (&_access);
+ return true;
}
-/* copy <= max_copy HANDLEs to dest[], starting at an offset into _our list_ of
- * begin_at. (Ie begin_at = 5, the first copied handle is still written to dest[0]
- * NOTE: Thread safe, but not thread guaranteed - a newly added process may be missed.
- * Who cares - It'll get caught the next time.
+/* This is single threaded. It's called after the process is removed
+ * from the cache, but inserts may be attemped by worker threads that
+ * have a pointer to it.
*/
-int
-process_cache::handle_snapshot (HANDLE * hdest, class process ** edest,
- ssize_t max_copy, int begin_at)
+void
+process::cleanup ()
{
- /* TODO:? grab a delete-lock, to prevent deletes during this process ? */
- class process *entry = head;
- int count = begin_at;
- /* skip begin_at entries */
- while (entry && count)
- {
- if (entry->exit_code () == STILL_ACTIVE)
- count--;
- entry = entry->next;
- }
- /* hit the end of the list within begin_at entries */
- if (count)
- return 0;
- HANDLE *hto = hdest;
- class process **eto = edest;
- while (entry && count < max_copy)
+ EnterCriticalSection (&_access);
+ assert (!_cleaning_up);
+ InterlockedExchange (&_cleaning_up, true);
+ cleanup_routine *entry = _routines_head;
+ _routines_head = NULL;
+ LeaveCriticalSection (&_access);
+
+ while (entry)
{
- /* hack */
- if (entry->exit_code () == STILL_ACTIVE)
- {
- *hto = entry->handle ();
- *eto = entry;
- count++;
- hto++;
- eto++;
- }
- entry = entry->next;
+ cleanup_routine *const ptr = entry;
+ entry = entry->_next;
+ ptr->cleanup (_winpid);
+ delete ptr;
}
- return count;
}
-/* process's */
-/* global process crit section */
-static CRITICAL_SECTION process_access;
-static pthread_once_t process_init;
+/*****************************************************************************/
void
-do_process_init (void)
+process_cache::submission_loop::request_loop ()
{
- InitializeCriticalSection (&process_access);
- /* we don't have a cache shutdown capability today */
+ assert (this);
+ assert (_cache);
+ assert (_interrupt_event);
+
+ while (_running)
+ _cache->wait_for_processes (_interrupt_event);
}
-process::process (DWORD winpid):
-winpid (winpid), next (NULL), cleaning_up (0), head (NULL), _exit_status (STILL_ACTIVE)
+/*****************************************************************************/
+
+process_cache::process_cache (const unsigned int initial_workers)
+ : _queue (initial_workers),
+ _submitter (this, &_queue), // true == interruptible
+ _processes_count (0),
+ _processes_head (NULL),
+ _cache_add_trigger (NULL)
{
- pthread_once (&process_init, do_process_init);
- EnterCriticalSection (&process_access);
- thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid);
- if (!thehandle)
+ /* there can only be one */
+ InitializeCriticalSection (&_cache_write_access);
+
+ _cache_add_trigger = CreateEvent (NULL, // SECURITY_ATTRIBUTES
+ FALSE, // Auto-reset
+ FALSE, // Initially non-signalled
+ NULL); // Anonymous
+
+ if (!_cache_add_trigger)
{
- system_printf ("unable to obtain handle for new cache process %lu", winpid);
- thehandle = INVALID_HANDLE_VALUE;
+ system_printf ("failed to create cache add trigger (%lu), terminating",
+ GetLastError ());
+ abort ();
}
- debug_printf ("Got handle %p for new cache process %lu", thehandle, winpid);
- InitializeCriticalSection (&access);
- LeaveCriticalSection (&process_access);
-}
-process::~process ()
-{
- DeleteCriticalSection (&access);
+ _queue.add_submission_loop (&_submitter);
}
-HANDLE
-process::handle ()
+process_cache::~process_cache ()
{
-// DWORD exitstate = exit_code ();
-// if (exitstate == STILL_ACTIVE)
- return thehandle;
-
- /* FIXME: call the cleanup list ? */
-
-// CloseHandle (thehandle);
-// debug_printf ("Process id %ld has terminated, attempting to open a new handle",
-// winpid);
-// thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid);
-// debug_printf ("Got handle %p when refreshing cache process %ld", thehandle, winpid);
-// /* FIXME: what if OpenProcess fails ? */
-// if (thehandle)
-// {
-// _exit_status = STILL_ACTIVE;
-// exit_code ();
-// }
-// else
-// thehandle = INVALID_HANDLE_VALUE;
-// return thehandle;
+ (void) CloseHandle (_cache_add_trigger);
+ DeleteCriticalSection (&_cache_write_access);
}
-DWORD process::exit_code ()
+/* This returns the process object to the caller already locked, that
+ * is, with the object's `access' critical region entered. Thus the
+ * caller must unlock the object when it's finished with it (via
+ * process::release ()). It must then not try to access the object
+ * afterwards, except by going through this routine again, as it may
+ * have been deleted once it has been unlocked.
+ */
+class process *
+process_cache::process (const DWORD winpid)
{
- if (_exit_status != STILL_ACTIVE)
- return _exit_status;
- bool
- err = GetExitCodeProcess (thehandle, &_exit_status);
- if (!err)
+ /* TODO: make this more granular, so a search doesn't involve the
+ * write lock.
+ */
+ EnterCriticalSection (&_cache_write_access);
+ class process *previous = NULL;
+ class process *entry = find (winpid, &previous);
+
+ if (!entry)
{
- system_printf ("Failed to retrieve exit code (%ld)", GetLastError ());
- thehandle = INVALID_HANDLE_VALUE;
- return _exit_status;
+ if (_processes_count + SPECIALS_COUNT >= MAXIMUM_WAIT_OBJECTS)
+ {
+ LeaveCriticalSection (&_cache_write_access);
+ system_printf (("process limit (%d processes) reached; "
+ "new connection refused"),
+ MAXIMUM_WAIT_OBJECTS - SPECIALS_COUNT);
+ set_errno (EAGAIN);
+ return NULL;
+ }
+
+ entry = new class process (winpid);
+ if (entry->_exit_status != STILL_ACTIVE)
+ {
+ LeaveCriticalSection (&_cache_write_access);
+ delete entry;
+ set_errno (ESRCH);
+ return NULL;
+ }
+
+ if (previous)
+ {
+ entry->_next = previous->_next;
+ previous->_next = entry;
+ }
+ else
+ {
+ entry->_next = _processes_head;
+ _processes_head = entry;
+ }
+
+ _processes_count += 1;
+ SetEvent (_cache_add_trigger);
}
- else if (_exit_status == STILL_ACTIVE)
- return _exit_status;
- /* add new cleanup task etc etc ? */
- return _exit_status;
+
+ EnterCriticalSection (&entry->_access); // To be released by the caller.
+ LeaveCriticalSection (&_cache_write_access);
+ assert (entry);
+ assert (entry->_winpid == winpid);
+ return entry;
}
-/* this is single threaded. It's called after the process is removed from the cache,
- * but inserts may be attemped by worker threads that have a pointer to it */
void
-process::cleanup ()
+process_cache::wait_for_processes (const HANDLE interrupt_event)
{
- /* Serialize this */
- EnterCriticalSection (&access);
- InterlockedIncrement (&(long)cleaning_up);
- class cleanup_routine *entry = head;
- while (entry)
+ // Update `_wait_array' with handles of all current processes.
+ const size_t count = sync_wait_array (interrupt_event);
+
+ debug_printf ("waiting on %u objects (out of %u processes)",
+ count, _processes_count);
+
+ const DWORD rc = WaitForMultipleObjects (count, _wait_array,
+ FALSE, INFINITE);
+
+ if (rc == WAIT_FAILED)
+ {
+ system_printf ("could not wait on the process handles, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+
+ const size_t start = rc - WAIT_OBJECT_0;
+
+ if (rc < WAIT_OBJECT_0 || start > count)
{
- class cleanup_routine *temp;
- entry->cleanup (winpid);
- temp = entry->next;
- delete entry;
- entry = temp;
+ system_printf (("unexpected return code %rc "
+ "from WaitForMultipleObjects: "
+ "expected [%u .. %u)"),
+ rc, WAIT_OBJECT_0, WAIT_OBJECT_0 + count);
+ abort ();
}
- LeaveCriticalSection (&access);
+
+ // Tell all the processes, from the signalled point up, the bad news.
+ for (size_t index = start; index != count; index++)
+ if (_process_array[index])
+ check_and_remove_process (index);
}
-bool
-process::add_cleanup_routine (class cleanup_routine *new_cleanup)
+/*
+ * process_cache::sync_wait_array ()
+ *
+ * Fill-in the wait array with the handles that the cache needs to wait on.
+ * These handles are:
+ * - the process_process_param's interrupt event
+ * - the process_cache's cache_add_trigger event
+ * - the handle for each live process in the cache.
+ *
+ * Return value: the number of live handles in the array.
+ */
+
+size_t
+process_cache::sync_wait_array (const HANDLE interrupt_event)
{
- if (cleaning_up)
- return false;
- EnterCriticalSection (&access);
- /* check that we didn't block with ::cleanup ()
- * This rigmarole is to get around win9x's glaring missing TryEnterCriticalSection call
- * which would be a whole lot easier
- */
- if (cleaning_up)
+ assert (this);
+ assert (_cache_add_trigger && _cache_add_trigger != INVALID_HANDLE_VALUE);
+ assert (interrupt_event && interrupt_event != INVALID_HANDLE_VALUE);
+
+ EnterCriticalSection (&_cache_write_access);
+
+ assert (_processes_count + SPECIALS_COUNT < elements (_wait_array));
+
+ size_t index = 0;
+
+ for (class process *ptr = _processes_head; ptr; ptr = ptr->_next)
{
- LeaveCriticalSection (&access);
- return false;
+ assert (ptr->_hProcess && ptr->_hProcess != INVALID_HANDLE_VALUE);
+ assert (ptr->_exit_status == STILL_ACTIVE);
+
+ _wait_array[index] = ptr->handle ();
+ _process_array[index++] = ptr;
+
+ assert (index <= elements (_wait_array));
}
- new_cleanup->next = head;
- head = new_cleanup;
- LeaveCriticalSection (&access);
- return true;
+
+ /* Sorry for shouting, but THESE MUST BE ADDED AT THE END! */
+ /* Well, not strictly `must', but it's more efficient if they are :-) */
+
+ _wait_array[index] = interrupt_event;
+ _process_array[index++] = NULL;
+
+ _wait_array[index] = _cache_add_trigger;
+ _process_array[index++] = NULL;
+
+ /* Phew, back to normal volume now. */
+
+ assert (index <= elements (_wait_array));
+
+ LeaveCriticalSection (&_cache_write_access);
+
+ return index;
}
-/* process_cleanup */
void
-process_cleanup::process ()
+process_cache::check_and_remove_process (const size_t index)
{
- theprocess->cleanup ();
- delete theprocess;
+ assert (this);
+ assert (index < elements (_wait_array) - SPECIALS_COUNT);
+
+ class process *const process = _process_array[index];
+
+ assert (process);
+ assert (process->handle () == _wait_array[index]);
+
+ if (process->exit_code () == STILL_ACTIVE)
+ return;
+
+ debug_printf ("process %lu has left the building ($? = %lu)",
+ process->_winpid, process->_exit_status);
+
+ /* Unlink the process object from the process list. */
+
+ EnterCriticalSection (&_cache_write_access);
+
+ class process *previous = NULL;
+
+ const class process *const tmp = find (process->_winpid, &previous);
+
+ assert (tmp == process);
+ assert (previous ? previous->_next == process : _processes_head == process);
+
+ if (previous)
+ previous->_next = process->_next;
+ else
+ _processes_head = process->_next;
+
+ _processes_count -= 1;
+ SetEvent (_cache_add_trigger);
+ LeaveCriticalSection (&_cache_write_access);
+
+ /* Schedule any cleanup tasks for this process. */
+ _queue.add (new process_cleanup (process));
}
-/* process_process_param */
-DWORD
-process_process_param::request_loop ()
+class process *
+process_cache::find (const DWORD winpid, class process **previous)
{
- process_cache *cache = (process_cache *) queue;
- /* always malloc one, so there is no special case in the loop */
- ssize_t HandlesSize = 2;
- HANDLE *Handles = (HANDLE *) malloc (sizeof (HANDLE) * HandlesSize);
- process **Entries = (process **) malloc (sizeof (LPVOID) * HandlesSize);
- /* TODO: put [1] at the end as it will also get done if a process dies? */
- Handles[0] = interrupt;
- Handles[1] = cache->cache_add_trigger;
- while (cache->active && !shutdown)
- {
- int copied;
- copied = -1;
- int offset;
- offset = 1;
- int count;
- count = 2;
- while ((copied == HandlesSize - 2 - offset) || copied < 0)
- {
- /* we need more storage to cope with all the HANDLES */
- if (copied == HandlesSize - 2 - offset)
- {
- HANDLE *temp = (HANDLE *) realloc (Handles,
- sizeof (HANDLE) *
- HandlesSize + 10);
- if (!temp)
- {
- system_printf
- ("cannot allocate more storage for the handle array!");
- exit (1);
- }
- Handles = temp;
- process **ptemp = (process **) realloc (Entries,
- sizeof (LPVOID) *
- HandlesSize + 10);
- if (!ptemp)
- {
- system_printf
- ("cannot allocate more storage for the handle array!");
- exit (1);
- }
- Entries = ptemp;
- HandlesSize += 10;
- }
- offset += copied;
- copied =
- cache->handle_snapshot (&Handles[2], &Entries[2],
- HandlesSize - 2 - offset, offset);
- count += copied;
- }
- // verbose: debug_printf ("waiting on %u objects", count);
- DWORD rc = WaitForMultipleObjects (count, Handles, FALSE, INFINITE);
- if (rc == WAIT_FAILED)
- {
- system_printf ("Could not wait on the process handles (%ld)!",
- GetLastError ());
- exit (1);
- }
- int objindex = rc - WAIT_OBJECT_0;
- if (objindex > 1 && objindex < count)
- {
- debug_printf ("Process %ld has left the building",
- Entries[objindex]->winpid);
- /* fire off the termination routines */
- cache->remove_process (Entries[objindex]);
- }
- else if (objindex >= 0 && objindex < 2)
- {
- /* 0 is shutdown - do nothing */
- /* 1 is a cache add event - just rebuild the object list */
- }
- else
- {
- system_printf
- ("unexpected return code from WaitForMultiple objects in process_process_param::request_loop");
- }
- }
- running = false;
- return 0;
+ if (previous)
+ *previous = NULL;
+
+ for (class process *ptr = _processes_head; ptr; ptr = ptr->_next)
+ if (ptr->_winpid == winpid)
+ return ptr;
+ else if (ptr->_winpid > winpid) // The list is sorted by winpid.
+ return NULL;
+ else if (previous)
+ *previous = ptr;
+
+ return NULL;
}
-/* cleanup_routine */
-cleanup_routine::~cleanup_routine ()
-{}
+/*****************************************************************************/
diff --git a/winsup/cygwin/cygserver_shm.cc b/winsup/cygwin/cygserver_shm.cc
index 64cfc11b630..0d30616b055 100755
--- a/winsup/cygwin/cygserver_shm.cc
+++ b/winsup/cygwin/cygserver_shm.cc
@@ -4,11 +4,11 @@
Originally written by Robert Collins <robert.collins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
#include "woutsup.h"
@@ -154,7 +154,11 @@ client_request_shm::serve (transport_layer_base * const conn,
HANDLE token_handle = NULL;
DWORD rc;
- from_process_handle = cache->process (parameters.in.winpid)->handle ();
+ class process *const process = cache->process (parameters.in.winpid);
+ assert (process);
+ from_process_handle = process->handle ();
+ process->release ();
+
/* possible TODO: reduce the access on the handle before we use it */
/* Note that unless we do this, we don't need to call CloseHandle - it's kept open
* by the process cache until the process terminates.
diff --git a/winsup/cygwin/include/cygwin/cygserver_process.h b/winsup/cygwin/include/cygwin/cygserver_process.h
index d1139272a3d..53da198af6c 100755
--- a/winsup/cygwin/include/cygwin/cygserver_process.h
+++ b/winsup/cygwin/include/cygwin/cygserver_process.h
@@ -13,73 +13,133 @@ details. */
#ifndef _CYGSERVER_PROCESS_
#define _CYGSERVER_PROCESS_
+#include <assert.h>
+
#include "threaded_queue.h"
-class process_cleanup:public queue_request
+class process_cleanup : public queue_request
{
public:
+ process_cleanup (class process *const theprocess)
+ : _process (theprocess)
+ {
+ assert (_process);
+ }
+
+ virtual ~process_cleanup ();
+
virtual void process ();
- process_cleanup (class process *nprocess) : theprocess (nprocess) {};
+
private:
- class process * theprocess;
+ class process *const _process;
};
-class process_process_param:public queue_process_param
-{
- class process_cache *cache;
-public:
- DWORD request_loop ();
- process_process_param ():queue_process_param (true) {};
-};
+class process;
class cleanup_routine
{
+ friend class process;
+
public:
- cleanup_routine () : next (NULL) {};
+ cleanup_routine () : _next (NULL) {}
virtual ~cleanup_routine ();
- class cleanup_routine * next;
+
/* MUST BE SYNCHRONOUS */
virtual void cleanup (DWORD winpid) = 0;
+
+private:
+ cleanup_routine *_next;
};
+class process_cache;
+
class process
{
+ friend process_cache;
+ friend process_cleanup;
+
public:
- HANDLE handle ();
- DWORD winpid;
process (DWORD winpid);
~process ();
- DWORD exit_code ();
- class process * next;
- long refcount;
- bool add_cleanup_routine (class cleanup_routine *);
- void cleanup ();
+
+ HANDLE handle () const { return _hProcess; }
+
+ void hold () { EnterCriticalSection (&_access); }
+ void release () { LeaveCriticalSection (&_access); }
+
private:
+ const DWORD _winpid;
+ class process *_next;
+ long _cleaning_up;
+ cleanup_routine *_routines_head;
+ DWORD _exit_status; // Set in the constructor and in exit_code ().
+ HANDLE _hProcess;
/* used to prevent races-on-delete */
- CRITICAL_SECTION access;
- volatile long cleaning_up;
- class cleanup_routine *head;
- HANDLE thehandle;
- DWORD _exit_status;
+ CRITICAL_SECTION _access;
+
+ DWORD exit_code ();
+ bool add (cleanup_routine *);
+ void cleanup ();
};
-class process_cache:public threaded_queue
+class process_cache
{
+ // Number of special (i.e., non-process) handles in _wait_array.
+ // See wait_for_processes () and sync_wait_array () for details.
+ enum {
+ SPECIALS_COUNT = 2
+ };
+
+ class submission_loop : public queue_submission_loop
+ {
+ public:
+ submission_loop (process_cache *const cache, threaded_queue *const queue)
+ : queue_submission_loop (queue, true),
+ _cache (cache)
+ {
+ assert (_cache);
+ }
+
+ private:
+ process_cache *const _cache;
+
+ virtual void request_loop ();
+ };
+
+ friend submission_loop;
+
public:
process_cache (unsigned int initial_workers);
~process_cache ();
+
class process *process (DWORD winpid);
- /* remove a process from the cache */
- int handle_snapshot (HANDLE *, class process **, ssize_t, int);
- void remove_process (class process *);
- /* threaded_queue methods */
- void process_requests ();
- HANDLE cache_add_trigger;
+
+ bool running () const { return _queue.running (); }
+
+ bool start () { return _queue.start (); }
+ bool stop () { return _queue.stop (); }
private:
- void add_task (class process *);
- class process *head;
- CRITICAL_SECTION cache_write_access;
+ threaded_queue _queue;
+ submission_loop _submitter;
+
+ size_t _processes_count;
+ class process *_processes_head; // A list sorted by winpid.
+
+ // Access to the _wait_array and related fields is not thread-safe,
+ // since they are used solely by wait_for_processes () and its callees.
+
+ HANDLE _wait_array[MAXIMUM_WAIT_OBJECTS];
+ class process *_process_array[MAXIMUM_WAIT_OBJECTS];
+
+ HANDLE _cache_add_trigger; // Actually both add and remove.
+ CRITICAL_SECTION _cache_write_access; // Actually both read and write access.
+
+ void wait_for_processes (HANDLE interrupt);
+ size_t sync_wait_array (HANDLE interrupt);
+ void check_and_remove_process (const size_t index);
+
+ class process *find (DWORD winpid, class process **previous = NULL);
};
#endif /* _CYGSERVER_PROCESS_ */
diff --git a/winsup/cygwin/threaded_queue.cc b/winsup/cygwin/threaded_queue.cc
index aab60263455..e3da7f74728 100755
--- a/winsup/cygwin/threaded_queue.cc
+++ b/winsup/cygwin/threaded_queue.cc
@@ -4,14 +4,15 @@
Written by Robert Collins <rbtcollins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
#include "woutsup.h"
+#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
@@ -19,226 +20,387 @@
#include <stdlib.h>
#include "threaded_queue.h"
+/*****************************************************************************/
+
+/* queue_request */
+
+queue_request::~queue_request ()
+{}
+
+/*****************************************************************************/
+
/* threaded_queue */
-DWORD WINAPI
-worker_function (LPVOID LpParam)
+threaded_queue::threaded_queue (const size_t initial_workers)
+ : _workers_count (0),
+ _running (false),
+ _submitters_head (NULL),
+ _requests_count (0),
+ _requests_head (NULL),
+ _requests_sem (NULL)
{
- class threaded_queue *queue = (class threaded_queue *) LpParam;
- class queue_request *request;
- /* FIXME use a threadsafe pop instead for speed? */
- while (queue->active)
+ InitializeCriticalSection (&_queue_lock);
+
+ // This semaphore's count is the number of requests on the queue.
+ // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
+ // multiplied by max. threads per process (2028?), which is (a few)
+ // more requests than could ever be pending with the current design.
+
+ _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
+ 0, // Initial count
+ 129792, // Maximum count
+ NULL); // Anonymous
+
+ if (!_requests_sem)
{
- EnterCriticalSection (&queue->queuelock);
- while (!queue->request && queue->active)
- {
- LeaveCriticalSection (&queue->queuelock);
- DWORD rc = WaitForSingleObject (queue->event, INFINITE);
- if (rc == WAIT_FAILED)
- {
- system_printf ("Wait for event failed");
- queue->running--;
- ExitThread (0);
- }
- EnterCriticalSection (&queue->queuelock);
- }
- if (!queue->active)
- {
- queue->running--;
- LeaveCriticalSection (&queue->queuelock);
- ExitThread (0);
- }
- /* not needed, but it is efficient */
- request =
- (class queue_request *) InterlockedExchangePointer (&queue->request,
- queue->request->
- next);
- LeaveCriticalSection (&queue->queuelock);
- request->process ();
- delete request;
+ system_printf (("failed to create the request queue semaphore, "
+ "error = %lu"),
+ GetLastError ());
+ abort ();
}
- queue->running--;
- ExitThread (0);
+
+ create_workers (initial_workers);
}
-void
-threaded_queue::create_workers ()
+threaded_queue::~threaded_queue ()
{
- InitializeCriticalSection (&queuelock);
- if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ if (_running)
+ stop ();
+
+ debug_printf ("deleting all pending queue requests");
+ queue_request *reqptr = _requests_head;
+ while (reqptr)
{
- system_printf ("Failed to create event queue (%lu), terminating",
- GetLastError ());
- exit (1);
+ queue_request *const ptr = reqptr;
+ reqptr = reqptr->_next;
+ delete ptr;
}
- active = true;
- /* FIXME: Use a stack pair and create threads on the fly whenever
- * we have to to service a request.
- */
- for (unsigned int i = 0; i < initial_workers; i++)
+ DeleteCriticalSection (&_queue_lock);
+ if (_requests_sem)
+ (void) CloseHandle (_requests_sem);
+}
+
+/* FIXME: return success or failure rather than quitting */
+void
+threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
+{
+ assert (this);
+ assert (submitter);
+ assert (submitter->_queue == this);
+ assert (!submitter->_next);
+
+ submitter->_next =
+ TInterlockedExchangePointer (&_submitters_head, submitter);
+
+ if (_running)
+ submitter->start ();
+}
+
+bool
+threaded_queue::start ()
+{
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = true;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (!was_running)
{
- HANDLE hThread;
- DWORD tid;
- hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
- if (hThread == NULL)
+ debug_printf ("starting all queue submission loops");
+
+ while (loopptr)
{
- system_printf ("Failed to create thread (%lu), terminating",
- GetLastError ());
- exit (1);
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->start ();
}
- CloseHandle (hThread);
- running++;
}
+
+ return was_running;
}
-void
-threaded_queue::cleanup ()
+bool
+threaded_queue::stop ()
{
- /* harvest the threads */
- active = false;
- /* kill the request processing loops */
- queue_process_param *reqloop;
- /* make sure we don't race with a incoming request creation */
- EnterCriticalSection (&queuelock);
- reqloop =
- (queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
- while (reqloop)
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = false;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (was_running)
{
- queue_process_param *t = reqloop;
- reqloop = reqloop->next;
- delete t;
+ debug_printf ("stopping all queue submission loops");
+ while (loopptr)
+ {
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->stop ();
+ }
+
+ ReleaseSemaphore (_requests_sem, _workers_count, NULL);
+ while (_workers_count)
+ {
+ debug_printf (("waiting for worker threads to terminate: "
+ "%lu still running"),
+ _workers_count);
+ sleep (1);
+ }
+ debug_printf ("all worker threads have terminated");
}
- LeaveCriticalSection (&queuelock);
- if (!running)
- return;
- debug_printf ("Waiting for current queue threads to terminate");
- for (int n = running; n; n--)
- PulseEvent (event);
- while (running)
- sleep (1);
- DeleteCriticalSection (&queuelock);
- CloseHandle (event);
+
+ return was_running;
}
/* FIXME: return success or failure */
void
-threaded_queue::add (queue_request * therequest)
+threaded_queue::add (queue_request *const therequest)
{
- /* safe to not "Try" because workers don't hog this, they wait on the event
- */
- EnterCriticalSection (&queuelock);
- if (!running)
+ assert (this);
+ assert (therequest);
+ assert (!therequest->_next);
+
+ if (!_workers_count)
{
- system_printf ("No worker threads to handle request!");
+ system_printf ("warning: no worker threads to handle request!");
+ // FIXME: And then what?
}
- if (!request)
- request = therequest;
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_requests_head)
+ _requests_head = therequest;
else
{
- /* add to the queue end. */
- queue_request *listrequest = request;
- while (listrequest->next)
- listrequest = listrequest->next;
- listrequest->next = therequest;
+ /* Add to the queue end. */
+ queue_request *reqptr = _requests_head;
+ for (; reqptr->_next; reqptr = reqptr->_next)
+ {}
+ assert (reqptr);
+ assert (!reqptr->_next);
+ reqptr->_next = therequest;
}
- PulseEvent (event);
- LeaveCriticalSection (&queuelock);
+
+ _requests_count += 1;
+ assert (_requests_count > 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ (void) ReleaseSemaphore (_requests_sem, 1, NULL);
}
-/* FIXME: return success or failure rather than quitting */
+/*static*/ DWORD WINAPI
+threaded_queue::start_routine (const LPVOID lpParam)
+{
+ class threaded_queue *const queue = (class threaded_queue *) lpParam;
+ assert (queue);
+
+ queue->worker_loop ();
+
+ const long count = InterlockedDecrement (&queue->_workers_count);
+ assert (count >= 0);
+
+ if (queue->_running)
+ debug_printf ("worker loop has exited; thread about to terminate");
+
+ return 0;
+}
+
+/* Called from the constructor: so no need to be thread-safe until the
+ * worker threads start to be created; thus the interlocked increment
+ * of the `_workers_count' field.
+ */
+
void
-threaded_queue::process_requests (queue_process_param * params,
- threaded_queue_thread_function *
- request_loop)
+threaded_queue::create_workers (const size_t initial_workers)
{
- if (params->start (request_loop, this) == false)
- exit (1);
- params->next =
- (queue_process_param *) InterlockedExchangePointer (&process_head,
- params);
+ for (unsigned int i = 0; i != initial_workers; i++)
+ {
+ const long count = InterlockedIncrement (&_workers_count);
+ assert (count > 0);
+
+ DWORD tid;
+ const HANDLE hThread =
+ CreateThread (NULL, 0, start_routine, this, 0, &tid);
+
+ if (!hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+
+ (void) CloseHandle (hThread);
+ }
}
-/* queue_process_param */
-/* How does a constructor return an error? */
-queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
-interruptible
-(ninterruptible)
+void
+threaded_queue::worker_loop ()
{
- if (!interruptible)
- return;
- debug_printf ("creating an interruptible processing thread");
- if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ while (true)
{
- system_printf ("Failed to create interrupt event (%lu), terminating",
- GetLastError ());
- exit (1);
+ const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
+ if (rc == WAIT_FAILED)
+ {
+ system_printf ("wait for request semaphore failed, error = %lu",
+ GetLastError ());
+ return;
+ }
+ assert (rc == WAIT_OBJECT_0);
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_running)
+ {
+ LeaveCriticalSection (&_queue_lock);
+ return;
+ }
+
+ assert (_requests_head);
+ queue_request *const reqptr = _requests_head;
+ _requests_head = reqptr->_next;
+
+ _requests_count -= 1;
+ assert (_requests_count >= 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ assert (reqptr);
+ reqptr->process ();
+ delete reqptr;
}
}
-queue_process_param::~queue_process_param ()
+/*****************************************************************************/
+
+/* queue_submission_loop */
+
+queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
+ const bool ninterruptible)
+ : _running (false),
+ _interrupt_event (NULL),
+ _queue (queue),
+ _interruptible (ninterruptible),
+ _hThread (NULL),
+ _tid (0),
+ _next (NULL)
{
- if (running)
+ if (_interruptible)
+ {
+ // verbose: debug_printf ("creating an interruptible processing thread");
+
+ _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
+ FALSE, // Auto-reset
+ FALSE, // Initially non-signalled
+ NULL); // Anonymous
+
+ if (!_interrupt_event)
+ {
+ system_printf ("failed to create interrupt event, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+ }
+}
+
+queue_submission_loop::~queue_submission_loop ()
+{
+ if (_running)
stop ();
- if (!interruptible)
- return;
- CloseHandle (interrupt);
+ if (_interrupt_event)
+ (void) CloseHandle (_interrupt_event);
+ if (_hThread)
+ (void) CloseHandle (_hThread);
}
bool
- queue_process_param::start (threaded_queue_thread_function * request_loop,
- threaded_queue * thequeue)
+queue_submission_loop::start ()
{
- queue = thequeue;
- hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
- if (hThread)
+ assert (this);
+ assert (!_hThread);
+
+ const bool was_running = _running;
+
+ if (!was_running)
{
- running = true;
- return true;
+ _running = true;
+
+ _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
+ if (!_hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
}
- system_printf ("Failed to create thread (%lu), terminating",
- GetLastError ());
- return false;
+
+ return was_running;
}
-void
-queue_process_param::stop ()
+bool
+queue_submission_loop::stop ()
{
- if (interruptible)
+ assert (this);
+ assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
+
+ const bool was_running = _running;
+
+ if (_running)
{
- InterlockedExchange (&shutdown, true);
- PulseEvent (interrupt);
- /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
- * scheduled again, we print an error and exit. We _should_ loop or
- * try resignalling. We don't want to hand here though...
- */
- int n = 5;
- while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
- if (!n)
+ _running = false;
+
+ if (_interruptible)
{
- system_printf ("Process thread didn't shutdown cleanly after 200ms!");
- exit (1);
+ assert (_interrupt_event
+ && _interrupt_event != INVALID_HANDLE_VALUE);
+
+ SetEvent (_interrupt_event);
+
+ if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
+ {
+ system_printf (("request loop thread %lu failed to shutdown "
+ "when asked politely: about to get heavy"),
+ _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ {
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
+ abort ();
+ }
+ }
}
else
- running = false;
- }
- else
- {
- debug_printf ("killing request loop thread %ld", tid);
- int rc;
- if (!(rc = TerminateThread (hThread, 0)))
{
- system_printf ("error shutting down request loop worker thread");
+ // FIXME: could wait to see if the request loop notices that
+ // the submission loop is no longer running and shuts down
+ // voluntarily.
+
+ debug_printf ("killing request loop thread %lu", _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
}
- running = false;
}
- CloseHandle (hThread);
+
+ return was_running;
}
-/* queue_request */
-queue_request::queue_request ()
- : next (NULL)
-{}
+/*static*/ DWORD WINAPI
+queue_submission_loop::start_routine (const LPVOID lpParam)
+{
+ class queue_submission_loop *const submission_loop =
+ (class queue_submission_loop *) lpParam;
+ assert (submission_loop);
-queue_request::~queue_request ()
-{}
+ submission_loop->request_loop ();
+
+ debug_printf ("submission loop has exited; thread about to terminate");
+
+ submission_loop->stop ();
+
+ return 0;
+}
+
+/*****************************************************************************/
diff --git a/winsup/cygwin/threaded_queue.h b/winsup/cygwin/threaded_queue.h
index d3c5c4cca93..a44a4b7273e 100755
--- a/winsup/cygwin/threaded_queue.h
+++ b/winsup/cygwin/threaded_queue.h
@@ -4,65 +4,124 @@
Written by Robert Collins <rbtcollins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
#ifndef _THREADED_QUEUE_
#define _THREADED_QUEUE_
+/*****************************************************************************/
+
/* a specific request */
class queue_request
{
- public:
- class queue_request *next;
- virtual void process () = 0;
- queue_request();
- virtual ~queue_request();
+public:
+ queue_request *_next;
+
+ queue_request() : _next (NULL) {}
+ virtual ~queue_request();
+
+ virtual void process () = 0;
};
+/*****************************************************************************/
-typedef DWORD WINAPI threaded_queue_thread_function (LPVOID);
-/* parameters for a request finding and submitting loop */
+/* a queue to allocate requests from n submission loops to x worker threads */
-class queue_process_param
+class queue_submission_loop;
+
+class threaded_queue
{
- public:
- bool start (threaded_queue_thread_function *, class threaded_queue *);
- void stop ();
- bool running;
- long int shutdown;
- class queue_process_param * next;
- class threaded_queue *queue;
- queue_process_param (bool ninterruptible);
- ~queue_process_param ();
- bool interruptible;
- HANDLE interrupt;
- HANDLE hThread;
- DWORD tid;
+public:
+ threaded_queue (size_t initial_workers = 1);
+ ~threaded_queue ();
+
+ void add_submission_loop (queue_submission_loop *);
+
+ bool running () const { return _running; }
+
+ bool start ();
+ bool stop ();
+
+ void add (queue_request *);
+
+private:
+ long _workers_count;
+ bool _running;
+
+ queue_submission_loop *_submitters_head;
+
+ long _requests_count; // Informational only.
+ queue_request *_requests_head;
+
+ CRITICAL_SECTION _queue_lock;
+ HANDLE _requests_sem; // == _requests_count
+
+ static DWORD WINAPI start_routine (LPVOID /* this */);
+
+ void create_workers (size_t initial_workers);
+ void worker_loop ();
};
-/* a queue to allocate requests from n submission loops to x worker threads */
+/*****************************************************************************/
-class threaded_queue
+/* parameters for a request finding and submitting loop */
+
+class queue_submission_loop
{
- public:
- CRITICAL_SECTION queuelock;
- HANDLE event;
- bool active;
- queue_request * request;
- unsigned int initial_workers;
- unsigned int running;
- void create_workers ();
- void cleanup ();
- void add (queue_request *);
- void process_requests (queue_process_param *, threaded_queue_thread_function *);
- threaded_queue () : active (false), request (NULL), initial_workers (1), running (0), process_head (NULL) {};
- private:
- queue_request *process_head;
+ friend threaded_queue;
+
+public:
+ queue_submission_loop (threaded_queue *, bool ninterruptible);
+ virtual ~queue_submission_loop ();
+
+ bool start ();
+ bool stop ();
+
+ threaded_queue *queue () { return _queue; };
+
+protected:
+ bool _running;
+ HANDLE _interrupt_event;
+ threaded_queue *const _queue;
+
+private:
+ bool _interruptible;
+ HANDLE _hThread;
+ DWORD _tid;
+ queue_submission_loop *_next;
+
+ static DWORD WINAPI start_routine (LPVOID /* this */);
+ virtual void request_loop () = 0;
};
+#ifdef __cplusplus
+
+/*---------------------------------------------------------------------------*
+ * Some type-safe versions of the various interlocked functions.
+ *---------------------------------------------------------------------------*/
+
+template <typename T> T *
+TInterlockedExchangePointer (T **lvalue, T *rvalue)
+{
+ return reinterpret_cast<T *>
+ (InterlockedExchangePointer (reinterpret_cast<void **> (lvalue),
+ reinterpret_cast<void *> (rvalue)));
+}
+
+template <typename T> T *
+TInterlockedCompareExchangePointer (T **lvalue, T *rvalue1, T *rvalue2)
+{
+ return reinterpret_cast<T *>
+ (InterlockedCompareExchangePointer (reinterpret_cast<void **> (lvalue),
+ reinterpret_cast<void *> (rvalue1),
+ reinterpret_cast<void *> (rvalue2)));
+}
+
+#endif /* __cplusplus */
+
#endif /* _THREADED_QUEUE_ */