From 43225860b6649b57f15e24721d2731abcf1bedc3 Mon Sep 17 00:00:00 2001 From: Brad King Date: Tue, 13 Jul 2004 16:50:55 -0400 Subject: ENH: Re-arranged handling of the two threads per pipe to improve readability of code. --- Source/kwsys/ProcessWin32.c | 186 +++++++++++++++++++++++++------------------- 1 file changed, 106 insertions(+), 80 deletions(-) (limited to 'Source/kwsys/ProcessWin32.c') diff --git a/Source/kwsys/ProcessWin32.c b/Source/kwsys/ProcessWin32.c index 5f772824a0..538f4b4678 100644 --- a/Source/kwsys/ProcessWin32.c +++ b/Source/kwsys/ProcessWin32.c @@ -78,10 +78,12 @@ typedef struct kwsysProcessCreateInformation_s /*--------------------------------------------------------------------------*/ typedef struct kwsysProcessPipeData_s kwsysProcessPipeData; -static DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd); -static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td); +static DWORD WINAPI kwsysProcessPipeThreadRead(LPVOID ptd); +static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, + kwsysProcessPipeData* td); static DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd); -static void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td); +static void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, + kwsysProcessPipeData* td); static int kwsysProcessInitialize(kwsysProcess* cp); static int kwsysProcessCreate(kwsysProcess* cp, int index, kwsysProcessCreateInformation* si, @@ -108,30 +110,34 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp); extern kwsysEXPORT int kwsysEncodedWriteArrayProcessFwd9x(const char* fname); /*--------------------------------------------------------------------------*/ -/* A structure containing data for each pipe's thread. */ -struct kwsysProcessPipeData_s +/* A structure containing synchronization data for each thread. */ +typedef struct kwsysProcessPipeSync_s kwsysProcessPipeSync; +struct kwsysProcessPipeSync_s { - /* ------------- Data managed per instance of kwsysProcess ------------- */ - - /* Handle for the thread for this pipe. */ + /* Handle to the thread. */ HANDLE Thread; - HANDLE ThreadWake; - /* Semaphore indicating a process and pipe are available. */ + /* Semaphore indicating to the thread that a process has started. */ HANDLE Ready; - /* Semaphore indicating when this thread's buffer is empty. */ - HANDLE Empty; + /* Semaphore indicating to the thread that it should begin work. */ + HANDLE Go; - /* Semaphore indicating a pipe thread has reset for another process. */ + /* Semaphore indicating thread has reset for another process. */ HANDLE Reset; +}; - /* Semaphore indicating the wake thread should unblock. */ - HANDLE Wake; +/*--------------------------------------------------------------------------*/ +/* A structure containing data for each pipe's threads. */ +struct kwsysProcessPipeData_s +{ + /* ------------- Data managed per instance of kwsysProcess ------------- */ + + /* Synchronization data for reading thread. */ + kwsysProcessPipeSync Reader; - /* Semaphore indicating the wake thread has reset for another process. */ - HANDLE ReadyWake; - HANDLE ResetWake; + /* Synchronization data for waking thread. */ + kwsysProcessPipeSync Waker; /* Index of this pipe. */ int Index; @@ -428,53 +434,54 @@ kwsysProcess* kwsysProcess_New() /* Give the thread a pointer back to the kwsysProcess instance. */ cp->Pipe[i].Process = cp; - /* The pipe is not yet ready to read. Initialize semaphore to 0. */ - if(!(cp->Pipe[i].Ready = CreateSemaphore(0, 0, 1, 0))) + /* No process is yet running. Initialize semaphore to 0. */ + if(!(cp->Pipe[i].Reader.Ready = CreateSemaphore(0, 0, 1, 0))) { kwsysProcess_Delete(cp); return 0; } /* The pipe is not yet reset. Initialize semaphore to 0. */ - if(!(cp->Pipe[i].Reset = CreateSemaphore(0, 0, 1, 0))) + if(!(cp->Pipe[i].Reader.Reset = CreateSemaphore(0, 0, 1, 0))) { kwsysProcess_Delete(cp); return 0; } /* The thread's buffer is initially empty. Initialize semaphore to 1. */ - if(!(cp->Pipe[i].Empty = CreateSemaphore(0, 1, 1, 0))) + if(!(cp->Pipe[i].Reader.Go = CreateSemaphore(0, 1, 1, 0))) { kwsysProcess_Delete(cp); return 0; } - /* Create the thread. It will block immediately. The thread will - not make deeply nested calls, so we need only a small - stack. */ - if(!(cp->Pipe[i].Thread = CreateThread(0, 1024, kwsysProcessPipeThread, - &cp->Pipe[i], 0, &dummy))) + /* Create the reading thread. It will block immediately. The + thread will not make deeply nested calls, so we need only a + small stack. */ + if(!(cp->Pipe[i].Reader.Thread = CreateThread(0, 1024, + kwsysProcessPipeThreadRead, + &cp->Pipe[i], 0, &dummy))) { kwsysProcess_Delete(cp); return 0; } - /* The wake thread should block. Initialize semaphore to 0. */ - if(!(cp->Pipe[i].ReadyWake = CreateSemaphore(0, 0, 1, 0))) + /* No process is yet running. Initialize semaphore to 0. */ + if(!(cp->Pipe[i].Waker.Ready = CreateSemaphore(0, 0, 1, 0))) { kwsysProcess_Delete(cp); return 0; } - /* The wake thread need not reset yet. Initialize semaphore to 0. */ - if(!(cp->Pipe[i].ResetWake = CreateSemaphore(0, 0, 1, 0))) + /* The pipe is not yet reset. Initialize semaphore to 0. */ + if(!(cp->Pipe[i].Waker.Reset = CreateSemaphore(0, 0, 1, 0))) { kwsysProcess_Delete(cp); return 0; } - /* The pipe will not need to wake yet. Initialize semaphore to 0. */ - if(!(cp->Pipe[i].Wake = CreateSemaphore(0, 0, 1, 0))) + /* The waker should not wake immediately. Initialize semaphore to 0. */ + if(!(cp->Pipe[i].Waker.Go = CreateSemaphore(0, 0, 1, 0))) { kwsysProcess_Delete(cp); return 0; @@ -483,9 +490,9 @@ kwsysProcess* kwsysProcess_New() /* Create the waking thread. It will block immediately. The thread will not make deeply nested calls, so we need only a small stack. */ - if(!(cp->Pipe[i].ThreadWake = CreateThread(0, 1024, - kwsysProcessPipeThreadWake, - &cp->Pipe[i], 0, &dummy))) + if(!(cp->Pipe[i].Waker.Thread = CreateThread(0, 1024, + kwsysProcessPipeThreadWake, + &cp->Pipe[i], 0, &dummy))) { kwsysProcess_Delete(cp); return 0; @@ -525,39 +532,41 @@ void kwsysProcess_Delete(kwsysProcess* cp) /* Terminate each of the threads. */ for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) { - if(cp->Pipe[i].Thread) + /* Terminate this reading thread. */ + if(cp->Pipe[i].Reader.Thread) { /* Signal the thread we are ready for it. It will terminate immediately since Deleting is set. */ - ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0); + ReleaseSemaphore(cp->Pipe[i].Reader.Ready, 1, 0); /* Wait for the thread to exit. */ - WaitForSingleObject(cp->Pipe[i].Thread, INFINITE); + WaitForSingleObject(cp->Pipe[i].Reader.Thread, INFINITE); /* Close the handle to the thread. */ - kwsysProcessCleanupHandle(&cp->Pipe[i].Thread); + kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Thread); } - if(cp->Pipe[i].ThreadWake) + /* Terminate this waking thread. */ + if(cp->Pipe[i].Waker.Thread) { /* Signal the thread we are ready for it. It will terminate immediately since Deleting is set. */ - ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0); + ReleaseSemaphore(cp->Pipe[i].Waker.Ready, 1, 0); /* Wait for the thread to exit. */ - WaitForSingleObject(cp->Pipe[i].ThreadWake, INFINITE); + WaitForSingleObject(cp->Pipe[i].Waker.Thread, INFINITE); /* Close the handle to the thread. */ - kwsysProcessCleanupHandle(&cp->Pipe[i].ThreadWake); + kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Thread); } /* Cleanup the pipe's semaphores. */ - kwsysProcessCleanupHandle(&cp->Pipe[i].Reset); - kwsysProcessCleanupHandle(&cp->Pipe[i].Ready); - kwsysProcessCleanupHandle(&cp->Pipe[i].Empty); - kwsysProcessCleanupHandle(&cp->Pipe[i].ReadyWake); - kwsysProcessCleanupHandle(&cp->Pipe[i].ResetWake); - kwsysProcessCleanupHandle(&cp->Pipe[i].Wake); + kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Ready); + kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Go); + kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Reset); + kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Ready); + kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Go); + kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Reset); } /* Close the shared semaphores. */ @@ -1194,8 +1203,8 @@ void kwsysProcess_Execute(kwsysProcess* cp) /* Tell the pipe threads that a process has started. */ for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) { - ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0); - ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0); + ReleaseSemaphore(cp->Pipe[i].Reader.Ready, 1, 0); + ReleaseSemaphore(cp->Pipe[i].Waker.Ready, 1, 0); } /* We don't care about the children's main threads. */ @@ -1231,8 +1240,8 @@ void kwsysProcess_Disown(kwsysProcess* cp) /* Wait for all pipe threads to reset. */ for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) { - WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); - WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE); + WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE); + WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE); } /* We will not wait for exit, so cleanup now. */ @@ -1278,7 +1287,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length, done with the data. */ if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) { - ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); + ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0); cp->CurrentIndex = KWSYSPE_PIPE_COUNT; } @@ -1317,8 +1326,10 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length, /* Data are available or a pipe closed. */ if(cp->Pipe[cp->CurrentIndex].Closed) { - /* The pipe closed. */ - ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Wake, 1, 0); + /* The pipe closed at the write end. Close the read end and + inform the wakeup thread it is done with this process. */ + kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read); + ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Waker.Go, 1, 0); --cp->PipesLeft; } else if(data && length) @@ -1413,15 +1424,15 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout) without releaseing the pipe's thread. Release it now. */ if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) { - ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); + ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0); cp->CurrentIndex = KWSYSPE_PIPE_COUNT; } /* Wait for all pipe threads to reset. */ for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) { - WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); - WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE); + WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE); + WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE); } /* ---- It is now safe again to call kwsysProcessCleanup. ----- */ @@ -1503,23 +1514,19 @@ void kwsysProcess_Kill(kwsysProcess* cp) Function executed for each pipe's thread. Argument is a pointer to the kwsysProcessPipeData instance for this thread. */ -DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd) +DWORD WINAPI kwsysProcessPipeThreadRead(LPVOID ptd) { kwsysProcessPipeData* td = (kwsysProcessPipeData*)ptd; kwsysProcess* cp = td->Process; /* Wait for a process to be ready. */ - while((WaitForSingleObject(td->Ready, INFINITE), !cp->Deleting)) + while((WaitForSingleObject(td->Reader.Ready, INFINITE), !cp->Deleting)) { /* Read output from the process for this thread's pipe. */ kwsysProcessPipeThreadReadPipe(cp, td); - /* We were signalled to exit with our buffer empty. Reset the - mutex for a new process. */ - ReleaseSemaphore(td->Empty, 1, 0); - /* Signal the main thread we have reset for a new process. */ - ReleaseSemaphore(td->Reset, 1, 0); + ReleaseSemaphore(td->Reader.Reset, 1, 0); } return 0; } @@ -1533,7 +1540,7 @@ DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd) void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td) { /* Wait for space in the thread's buffer. */ - while((WaitForSingleObject(td->Empty, INFINITE), !td->Closed)) + while((WaitForSingleObject(td->Reader.Go, INFINITE), !td->Closed)) { /* Read data from the pipe. This may block until data are available. */ if(!ReadFile(td->Read, td->DataBuffer, KWSYSPE_PIPE_BUFFER_SIZE, @@ -1555,6 +1562,10 @@ void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td) cp->SharedIndex = td->Index; ReleaseSemaphore(cp->Full, 1, 0); } + + /* We were signalled to exit with our buffer empty. Reset the + mutex for a new process. */ + ReleaseSemaphore(td->Reader.Go, 1, 0); } /*--------------------------------------------------------------------------*/ @@ -1569,29 +1580,34 @@ DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd) kwsysProcess* cp = td->Process; /* Wait for a process to be ready. */ - while((WaitForSingleObject(td->ReadyWake, INFINITE), !cp->Deleting)) + while((WaitForSingleObject(td->Waker.Ready, INFINITE), !cp->Deleting)) { /* Wait for a possible wakeup. */ kwsysProcessPipeThreadWakePipe(cp, td); /* Signal the main thread we have reset for a new process. */ - ReleaseSemaphore(td->ResetWake, 1, 0); + ReleaseSemaphore(td->Waker.Reset, 1, 0); } return 0; } +/*--------------------------------------------------------------------------*/ + +/* + Function called in each pipe's thread to handle reading thread + wakeup for one execution of a subprocess. +*/ void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td) { /* Wait for a possible wake command. */ - if((WaitForSingleObject(td->Wake, INFINITE), !td->Closed)) + WaitForSingleObject(td->Waker.Go, INFINITE); + + /* If the pipe is not closed, we need to wake up the reading thread. */ + if(!td->Closed) { - /* The pipe is not closed. We need to wake up the reading thread. */ DWORD dummy; WriteFile(td->Write, "", 1, &dummy, 0); } - - /* We have processed the wake command. */ - ReleaseSemaphore(td->Wake, 1, 0); } /*--------------------------------------------------------------------------*/ @@ -2666,11 +2682,11 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp) /* If data were just reported data, release the pipe's thread. */ if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) { - ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); + ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0); cp->CurrentIndex = KWSYSPE_PIPE_COUNT; } - /* Signal all the pipe wakeup threads. */ + /* Wakeup all reading threads that are not on closed pipes. */ for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) { /* The wakeup threads will write one byte to the pipe write ends. @@ -2684,18 +2700,28 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp) thread to call WriteFile. If it blocks, that is okay because it will unblock when we close the read end and break the pipe below. */ - ReleaseSemaphore(cp->Pipe[i].Wake, 1, 0); + if(cp->Pipe[i].Read) + { + ReleaseSemaphore(cp->Pipe[i].Waker.Go, 1, 0); + } } /* Tell pipe threads to reset until we run another process. */ while(cp->PipesLeft > 0) { + /* The waking threads will cause all reading threads to report. + Wait for the next one and save its index. */ WaitForSingleObject(cp->Full, INFINITE); cp->CurrentIndex = cp->SharedIndex; ReleaseSemaphore(cp->SharedIndexMutex, 1, 0); - kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read); + + /* We are done reading this pipe. Close its read handle. */ cp->Pipe[cp->CurrentIndex].Closed = 1; - ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); + kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read); --cp->PipesLeft; + + /* Tell the reading thread we are done with the data. It will + reset immediately because the pipe is closed. */ + ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0); } } -- cgit v1.2.1