diff options
author | Justus Winter <justus@g10code.com> | 2016-10-19 12:20:44 +0200 |
---|---|---|
committer | Werner Koch <wk@gnupg.org> | 2016-11-12 19:55:05 +0100 |
commit | 40e5ff0a0084c0d9521b401db4f38885bfdae233 (patch) | |
tree | ce754e1533719e6931d11480adc4ca28aad5d12f | |
parent | e15416d3668ea9dcc6a64cbb98140a99be8a7865 (diff) | |
download | libgpg-error-40e5ff0a0084c0d9521b401db4f38885bfdae233.tar.gz |
estream: Support 'es_poll' on Windows.
* src/Makefile.am (arch_sources): Add new file.
* src/estream.c (O_NONBLOCK): Move to 'gpgrt-int.h'.
(BUFFER_BLOCK_SIZE): Likewise.
(BUFFER_UNREAD_SIZE): Likewise.
(struct notify_list_s, notify_list_t): Likewise.
(struct _gpgrt_stream_internal, estream_internal_t): Likewise.
(X_POLLABLE): New macro.
(parse_mode): Parse keyword 'pollable', emulate O_NONBLOCK using the
same mechanism on Windows.
(_gpgrt_poll): Use the new '_gpgrt_w32_poll' on Windows.
* src/gpgrt-int.h (_gpgrt_functions_w32_pollable): New declaration.
(_gpgrt_w32_pollable_create): New prototype.
(_gpgrt_w32_poll): Likewise.
* src/w32-estream.c: New file. This code is adapted from GPGME.
* tests/t-poll.c (create_pipe): Create pollable streams.
GnuPG-bug-id: 2731
Signed-off-by: Justus Winter <justus@g10code.com>
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/estream.c | 125 | ||||
-rw-r--r-- | src/gpgrt-int.h | 74 | ||||
-rw-r--r-- | src/w32-estream.c | 1047 | ||||
-rw-r--r-- | tests/t-poll.c | 4 |
5 files changed, 1177 insertions, 76 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index c1e86a7..0c18252 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -109,7 +109,8 @@ CLEANFILES = err-sources.h err-codes.h code-to-errno.h code-from-errno.h \ # {{{ Begin Windows part # if HAVE_W32_SYSTEM -arch_sources = w32-gettext.c w32-lock.c w32-lock-obj.h w32-thread.c w32-iconv.c +arch_sources = w32-gettext.c w32-lock.c w32-lock-obj.h w32-thread.c \ + w32-iconv.c w32-estream.c RCCOMPILE = $(RC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \ -DLOCALEDIR=\"$(localedir)\" $(AM_CPPFLAGS) $(CPPFLAGS) LTRCCOMPILE = $(LIBTOOL) --mode=compile --tag=RC $(RCCOMPILE) diff --git a/src/estream.c b/src/estream.c index aca7db7..c46bc61 100644 --- a/src/estream.c +++ b/src/estream.c @@ -127,8 +127,6 @@ # ifndef S_IXOTH # define S_IXOTH S_IXUSR # endif -# undef O_NONBLOCK -# define O_NONBLOCK 0 /* FIXME: Not yet supported. */ #endif #if !defined (EWOULDBLOCK) && defined (HAVE_W32_SYSTEM) @@ -174,66 +172,6 @@ typedef void (*func_free_t) (void *mem); /* - * Buffer management layer. - */ - -#define BUFFER_BLOCK_SIZE BUFSIZ -#define BUFFER_UNREAD_SIZE 16 - - -/* - * A type to hold notification functions. - */ -struct notify_list_s -{ - struct notify_list_s *next; - void (*fnc) (estream_t, void*); /* The notification function. */ - void *fnc_value; /* The value to be passed to FNC. */ -}; -typedef struct notify_list_s *notify_list_t; - - -/* - * The private object describing a stream. - */ -struct _gpgrt_stream_internal -{ - unsigned char buffer[BUFFER_BLOCK_SIZE]; - unsigned char unread_buffer[BUFFER_UNREAD_SIZE]; - - gpgrt_lock_t lock; /* Lock. Used by *_stream_lock(). */ - - gpgrt_stream_backend_kind_t kind; - void *cookie; /* Cookie. */ - void *opaque; /* Opaque data. */ - unsigned int modeflags; /* Flags for the backend. */ - char *printable_fname; /* Malloced filename for es_fname_get. */ - gpgrt_off_t offset; - gpgrt_cookie_read_function_t func_read; - gpgrt_cookie_write_function_t func_write; - gpgrt_cookie_seek_function_t func_seek; - gpgrt_cookie_close_function_t func_close; - cookie_ioctl_function_t func_ioctl; - int strategy; - es_syshd_t syshd; /* A copy of the system handle. */ - struct - { - unsigned int err: 1; - unsigned int eof: 1; - unsigned int hup: 1; - } indicators; - unsigned int deallocate_buffer: 1; - unsigned int is_stdstream:1; /* This is a standard stream. */ - unsigned int stdstream_fd:2; /* 0, 1 or 2 for a standard stream. */ - unsigned int printable_fname_inuse: 1; /* es_fname_get has been used. */ - unsigned int samethread: 1; /* The "samethread" mode keyword. */ - size_t print_ntotal; /* Bytes written from in print_writer. */ - notify_list_t onclose; /* On close notify function list. */ -}; -typedef struct _gpgrt_stream_internal *estream_internal_t; - - -/* * A linked list to hold active stream objects. * Protected by ESTREAM_LIST_LOCK. */ @@ -1686,6 +1624,7 @@ func_file_create (void **cookie, int *filedes, /* Flags used by parse_mode and friends. */ #define X_SAMETHREAD (1 << 0) #define X_SYSOPEN (1 << 1) +#define X_POLLABLE (1 << 2) /* Parse the mode flags of fopen et al. In addition to the POSIX * defined mode flags keyword parameters are supported. These are @@ -1723,6 +1662,13 @@ func_file_create (void **cookie, int *filedes, * under Windows the direct W32 API functions (HANDLE) are used * instead of their libc counterparts (fd). * + * pollable + * + * The object is opened in a way suitable for use with es_poll. On + * POSIX this is a NOP but under Windows we create up to two + * threads, one for reading and one for writing, do any I/O there, + * and synchronize with them in order to support es_poll. + * * Note: R_CMODE is optional because is only required by functions * which are able to creat a file. */ @@ -1828,6 +1774,10 @@ parse_mode (const char *modestr, return -1; } oflags |= O_NONBLOCK; +#if HAVE_W32_SYSTEM + /* Currently, nonblock implies pollable on Windows. */ + *r_xmode |= X_POLLABLE; +#endif } else if (!strncmp (modestr, "sysopen", 7)) { @@ -1839,6 +1789,16 @@ parse_mode (const char *modestr, } *r_xmode |= X_SYSOPEN; } + else if (!strncmp (modestr, "pollable", 8)) + { + modestr += 8; + if (*modestr && !strchr (" \t,", *modestr)) + { + _set_errno (EINVAL); + return -1; + } + *r_xmode |= X_POLLABLE; + } } if (!got_cmode) cmode = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); @@ -2126,6 +2086,23 @@ es_create (estream_t *stream, void *cookie, es_syshd_t *syshd, stream_new->unread_buffer_size = sizeof (stream_internal_new->unread_buffer); stream_new->intern = stream_internal_new; +#if _WIN32 + if (xmode & X_POLLABLE) + { + void *new_cookie; + + err = _gpgrt_w32_pollable_create (&new_cookie, modeflags, + functions, cookie); + if (err) + goto out; + + modeflags &= ~O_NONBLOCK; + cookie = new_cookie; + kind = BACKEND_W32_POLLABLE; + functions = _gpgrt_functions_w32_pollable; + } +#endif + init_stream_obj (stream_new, cookie, syshd, kind, functions, modeflags, xmode); init_stream_lock (stream_new); @@ -4731,11 +4708,13 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) { gpgrt_poll_t *item; int count = 0; +#ifndef _WIN32 fd_set readfds, writefds, exceptfds; int any_readfd, any_writefd, any_exceptfd; - int idx; int max_fd; int fd, ret, any; +#endif + int idx; if (!fds) { @@ -4783,6 +4762,15 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) return count; /* Early return without waiting. */ /* Now do the real select. */ +#ifdef _WIN32 + if (pre_syscall_func) + pre_syscall_func (); + + count = _gpgrt_w32_poll (fds, nfds, timeout); + + if (post_syscall_func) + post_syscall_func (); +#else any_readfd = any_writefd = any_exceptfd = 0; max_fd = 0; for (item = fds, idx = 0; idx < nfds; item++, idx++) @@ -4828,11 +4816,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) } } -#ifdef _WIN32 - (void)timeout; - ret = -1; - _set_errno (EOPNOTSUPP); -#else if (pre_syscall_func) pre_syscall_func (); do @@ -4850,7 +4833,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) while (ret == -1 && errno == EINTR); if (post_syscall_func) post_syscall_func (); -#endif if (ret == -1) return -1; @@ -4876,9 +4858,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) item->got_hup = 1; any = 1; } -#ifndef _WIN32 - /* NB.: We can't use FD_ISSET under windows - but we don't have - * support for it anyway. */ if (item->want_read && FD_ISSET (fd, &readfds)) { item->got_read = 1; @@ -4894,11 +4873,11 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout) item->got_oob = 1; any = 1; } -#endif /*!_WIN32*/ if (any) count++; } +#endif return count; } diff --git a/src/gpgrt-int.h b/src/gpgrt-int.h index a517cc6..a6e6036 100644 --- a/src/gpgrt-int.h +++ b/src/gpgrt-int.h @@ -52,6 +52,12 @@ gpg_err_code_t _gpgrt_yield (void); /* Local definitions for estream. */ +#if HAVE_W32_SYSTEM +# ifndef O_NONBLOCK +# define O_NONBLOCK 0x40000000 /* FIXME: Is that safe? */ +# endif +#endif + /* * A private cookie function to implement an internal IOCTL service. * and ist IOCTL numbers. @@ -80,6 +86,65 @@ typedef enum } gpgrt_stream_backend_kind_t; +/* + * A type to hold notification functions. + */ +struct notify_list_s +{ + struct notify_list_s *next; + void (*fnc) (estream_t, void*); /* The notification function. */ + void *fnc_value; /* The value to be passed to FNC. */ +}; +typedef struct notify_list_s *notify_list_t; + + +/* + * Buffer management layer. + */ + +#define BUFFER_BLOCK_SIZE BUFSIZ +#define BUFFER_UNREAD_SIZE 16 + + +/* + * The private object describing a stream. + */ +struct _gpgrt_stream_internal +{ + unsigned char buffer[BUFFER_BLOCK_SIZE]; + unsigned char unread_buffer[BUFFER_UNREAD_SIZE]; + + gpgrt_lock_t lock; /* Lock. Used by *_stream_lock(). */ + + gpgrt_stream_backend_kind_t kind; + void *cookie; /* Cookie. */ + void *opaque; /* Opaque data. */ + unsigned int modeflags; /* Flags for the backend. */ + char *printable_fname; /* Malloced filename for es_fname_get. */ + gpgrt_off_t offset; + gpgrt_cookie_read_function_t func_read; + gpgrt_cookie_write_function_t func_write; + gpgrt_cookie_seek_function_t func_seek; + gpgrt_cookie_close_function_t func_close; + cookie_ioctl_function_t func_ioctl; + int strategy; + es_syshd_t syshd; /* A copy of the system handle. */ + struct + { + unsigned int err: 1; + unsigned int eof: 1; + unsigned int hup: 1; + } indicators; + unsigned int deallocate_buffer: 1; + unsigned int is_stdstream:1; /* This is a standard stream. */ + unsigned int stdstream_fd:2; /* 0, 1 or 2 for a standard stream. */ + unsigned int printable_fname_inuse: 1; /* es_fname_get has been used. */ + unsigned int samethread: 1; /* The "samethread" mode keyword. */ + size_t print_ntotal; /* Bytes written from in print_writer. */ + notify_list_t onclose; /* On close notify function list. */ +}; +typedef struct _gpgrt_stream_internal *estream_internal_t; + /* Local prototypes for estream. */ int _gpgrt_es_init (void); @@ -237,5 +302,14 @@ const char *_gpgrt_fname_get (gpgrt_stream_t stream); #include "estream-printf.h" +#if _WIN32 +/* Prototypes for w32-estream.c. */ +struct cookie_io_functions_s _gpgrt_functions_w32_pollable; +int _gpgrt_w32_pollable_create (void *_GPGRT__RESTRICT *_GPGRT__RESTRICT cookie, + unsigned int modeflags, + struct cookie_io_functions_s next_functions, + void *next_cookie); +int _gpgrt_w32_poll (gpgrt_poll_t *fds, size_t nfds, int timeout); +#endif #endif /*_GPGRT_GPGRT_INT_H*/ diff --git a/src/w32-estream.c b/src/w32-estream.c new file mode 100644 index 0000000..516b238 --- /dev/null +++ b/src/w32-estream.c @@ -0,0 +1,1047 @@ +/* w32-estream.c - es_poll support on W32. + * Copyright (C) 2000 Werner Koch (dd9jn) + * Copyright (C) 2001, 2002, 2003, 2004, 2007, 2010, 2016 g10 Code GmbH + * + * This file is part of libgpg-error. + * + * libgpg-error is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * libgpg-error is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program; if not, see <https://www.gnu.org/licenses/>. + */ + +/* + * This file is based on GPGME's w32-io.c started in 2001. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#ifdef HAVE_SYS_TIME_H +# include <sys/time.h> +#endif +#ifdef HAVE_SYS_TYPES_H +# include <sys/types.h> +#endif +#include <io.h> +#include <windows.h> + +#include "gpgrt-int.h" + +/* + * In order to support es_poll on Windows, we create a proxy shim that + * we use as the estream I/O functions. This shim creates reader and + * writer threads that use the original I/O functions. + */ + + + +/* Tracing/debugging support. */ +#if 0 +#define TRACE(msg, ...) \ + fprintf (stderr, msg, ## __VA_ARGS__) +#define TRACE_CTX(ctx, msg, ...) \ + fprintf (stderr, "%p: " msg "\n", ctx, ## __VA_ARGS__) +#define TRACE_ERR(ctx, err, msg, ...) do { \ + char error_message[128]; \ + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM \ + | FORMAT_MESSAGE_IGNORE_INSERTS, \ + NULL, \ + err, \ + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), \ + (LPTSTR) &error_message, \ + sizeof error_message, NULL ); \ + fprintf (stderr, "%p: " msg ": %s\n", ctx, \ + ## __VA_ARGS__, error_message); \ + } while (0) +#else +#define TRACE(msg, ...) (void) 0 +#define TRACE_CTX(ctx, msg, ...) (void) 0 +#define TRACE_ERR(ctx, err, msg, ...) (void) 0 +#endif + + + +/* Calculate array dimension. */ +#ifndef DIM +#define DIM(array) (sizeof (array) / sizeof (*array)) +#endif + +#define READBUF_SIZE 4096 +#define WRITEBUF_SIZE 4096 + + +typedef struct estream_cookie_w32_pollable *estream_cookie_w32_pollable_t; + +struct reader_context_s +{ + estream_cookie_w32_pollable_t pcookie; + HANDLE thread_hd; + + gpgrt_lock_t mutex; + + int stop_me; + int eof; + int eof_shortcut; + int error; + int error_code; + + /* This is manually reset. */ + HANDLE have_data_ev; + /* This is automatically reset. */ + HANDLE have_space_ev; + /* This is manually reset but actually only triggered once. */ + HANDLE close_ev; + + size_t readpos, writepos; + char buffer[READBUF_SIZE]; +}; + +struct writer_context_s +{ + estream_cookie_w32_pollable_t pcookie; + HANDLE thread_hd; + + gpgrt_lock_t mutex; + + int stop_me; + int error; + int error_code; + + /* This is manually reset. */ + HANDLE have_data; + HANDLE is_empty; + HANDLE close_ev; + size_t nbytes; + char buffer[WRITEBUF_SIZE]; +}; + +/* Cookie for pollable objects. */ +struct estream_cookie_w32_pollable +{ + unsigned int modeflags; + + struct cookie_io_functions_s next_functions; + void *next_cookie; + + struct reader_context_s *reader; + struct writer_context_s *writer; +}; + + +static HANDLE +set_synchronize (HANDLE hd) +{ +#ifdef HAVE_W32CE_SYSTEM + return hd; +#else + HANDLE new_hd; + + /* For NT we have to set the sync flag. It seems that the only way + to do it is by duplicating the handle. Tsss... */ + if (!DuplicateHandle (GetCurrentProcess (), hd, + GetCurrentProcess (), &new_hd, + EVENT_MODIFY_STATE | SYNCHRONIZE, FALSE, 0)) + { + TRACE_ERR (NULL, GetLastError (), "DuplicateHandle failed"); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return INVALID_HANDLE_VALUE; + } + + CloseHandle (hd); + return new_hd; +#endif +} + + +static DWORD CALLBACK +reader (void *arg) +{ + struct reader_context_s *ctx = arg; + int nbytes; + ssize_t nread; + TRACE_CTX (ctx, "reader starting"); + + for (;;) + { + _gpgrt_lock_lock (&ctx->mutex); + /* Leave a 1 byte gap so that we can see whether it is empty or + full. */ + while ((ctx->writepos + 1) % READBUF_SIZE == ctx->readpos) + { + /* Wait for space. */ + if (!ResetEvent (ctx->have_space_ev)) + TRACE_ERR (ctx, GetLastError (), "ResetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + TRACE_CTX (ctx, "waiting for space"); + WaitForSingleObject (ctx->have_space_ev, INFINITE); + TRACE_CTX (ctx, "got space"); + _gpgrt_lock_lock (&ctx->mutex); + } + assert (((ctx->writepos + 1) % READBUF_SIZE != ctx->readpos)); + if (ctx->stop_me) + { + _gpgrt_lock_unlock (&ctx->mutex); + break; + } + nbytes = (ctx->readpos + READBUF_SIZE + - ctx->writepos - 1) % READBUF_SIZE; + assert (nbytes); + if (nbytes > READBUF_SIZE - ctx->writepos) + nbytes = READBUF_SIZE - ctx->writepos; + _gpgrt_lock_unlock (&ctx->mutex); + + TRACE_CTX (ctx, "reading up to %d bytes", nbytes); + + nread = ctx->pcookie->next_functions.public.func_read + (ctx->pcookie->next_cookie, ctx->buffer + ctx->writepos, nbytes); + TRACE_CTX (ctx, "got %d bytes", nread); + if (nread < 0) + { + ctx->error_code = (int) errno; + /* NOTE (W32CE): Do not ignore ERROR_BUSY! Check at + least stop_me if that happens. */ + if (ctx->error_code == ERROR_BROKEN_PIPE) + { + ctx->eof = 1; + TRACE_CTX (ctx, "got EOF (broken pipe)"); + } + else + { + ctx->error = 1; + TRACE_ERR (ctx, ctx->error_code, "read error"); + } + break; + } + + _gpgrt_lock_lock (&ctx->mutex); + if (ctx->stop_me) + { + _gpgrt_lock_unlock (&ctx->mutex); + break; + } + if (!nread) + { + ctx->eof = 1; + TRACE_CTX (ctx, "got eof"); + _gpgrt_lock_unlock (&ctx->mutex); + break; + } + + ctx->writepos = (ctx->writepos + nread) % READBUF_SIZE; + if (!SetEvent (ctx->have_data_ev)) + TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed", + ctx->have_data_ev); + _gpgrt_lock_unlock (&ctx->mutex); + } + /* Indicate that we have an error or EOF. */ + if (!SetEvent (ctx->have_data_ev)) + TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed", + ctx->have_data_ev); + + TRACE_CTX (ctx, "waiting for close"); + WaitForSingleObject (ctx->close_ev, INFINITE); + + CloseHandle (ctx->close_ev); + CloseHandle (ctx->have_data_ev); + CloseHandle (ctx->have_space_ev); + CloseHandle (ctx->thread_hd); + _gpgrt_lock_destroy (&ctx->mutex); + _gpgrt_free (ctx); + + return 0; +} + + +static struct reader_context_s * +create_reader (estream_cookie_w32_pollable_t pcookie) +{ + struct reader_context_s *ctx; + SECURITY_ATTRIBUTES sec_attr; + DWORD tid; + + memset (&sec_attr, 0, sizeof sec_attr); + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = FALSE; + + ctx = calloc (1, sizeof *ctx); + if (!ctx) + { + return NULL; + } + + ctx->pcookie = pcookie; + + ctx->have_data_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL); + if (ctx->have_data_ev) + ctx->have_space_ev = CreateEvent (&sec_attr, FALSE, TRUE, NULL); + if (ctx->have_space_ev) + ctx->close_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL); + if (!ctx->have_data_ev || !ctx->have_space_ev || !ctx->close_ev) + { + TRACE_ERR (ctx, GetLastError (), "CreateEvent failed"); + if (ctx->have_data_ev) + CloseHandle (ctx->have_data_ev); + if (ctx->have_space_ev) + CloseHandle (ctx->have_space_ev); + if (ctx->close_ev) + CloseHandle (ctx->close_ev); + _gpgrt_free (ctx); + return NULL; + } + + ctx->have_data_ev = set_synchronize (ctx->have_data_ev); + _gpgrt_lock_init (&ctx->mutex); + +#ifdef HAVE_W32CE_SYSTEM + ctx->thread_hd = CreateThread (&sec_attr, 64 * 1024, reader, ctx, + STACK_SIZE_PARAM_IS_A_RESERVATION, &tid); +#else + ctx->thread_hd = CreateThread (&sec_attr, 0, reader, ctx, 0, &tid); +#endif + + if (!ctx->thread_hd) + { + TRACE_ERR (ctx, GetLastError (), "CreateThread failed"); + _gpgrt_lock_destroy (&ctx->mutex); + if (ctx->have_data_ev) + CloseHandle (ctx->have_data_ev); + if (ctx->have_space_ev) + CloseHandle (ctx->have_space_ev); + if (ctx->close_ev) + CloseHandle (ctx->close_ev); + _gpgrt_free (ctx); + return NULL; + } + else + { +#if 0 + /* We set the priority of the thread higher because we know that + it only runs for a short time. This greatly helps to + increase the performance of the I/O. */ + SetThreadPriority (ctx->thread_hd, get_desired_thread_priority ()); +#endif + } + + return ctx; +} + + +/* Prepare destruction of the reader thread for CTX. Returns 0 if a + call to this function is sufficient and destroy_reader_finish shall + not be called. */ +static void +destroy_reader (struct reader_context_s *ctx) +{ + _gpgrt_lock_lock (&ctx->mutex); + ctx->stop_me = 1; + if (ctx->have_space_ev) + SetEvent (ctx->have_space_ev); + _gpgrt_lock_unlock (&ctx->mutex); + +#ifdef HAVE_W32CE_SYSTEM + /* Scenario: We never create a full pipe, but already started + reading. Then we need to unblock the reader in the pipe driver + to make our reader thread notice that we want it to go away. */ + + if (ctx->file_hd != INVALID_HANDLE_VALUE) + { + if (!DeviceIoControl (ctx->file_hd, GPGCEDEV_IOCTL_UNBLOCK, + NULL, 0, NULL, 0, NULL, NULL)) + { + TRACE_ERR (ctx, GetLastError (), "unblock control call failed"); + } + } +#endif + + /* XXX is it feasible to unblock the thread? */ + + /* After setting this event CTX is void. */ + SetEvent (ctx->close_ev); +} + + +/* + * Read function for pollable objects. + */ +static gpgrt_ssize_t +func_w32_pollable_read (void *cookie, void *buffer, size_t count) +{ + estream_cookie_w32_pollable_t pcookie = cookie; + gpgrt_ssize_t nread; + struct reader_context_s *ctx; + + ctx = pcookie->reader; + if (ctx == NULL) + { + pcookie->reader = ctx = create_reader (pcookie); + if (!ctx) + { + _gpg_err_set_errno (EBADF); + return -1; + } + } + + TRACE_CTX (ctx, "pollable read buffer=%p, count=%u", buffer, count); + + if (ctx->eof_shortcut) + return 0; + + _gpgrt_lock_lock (&ctx->mutex); + TRACE_CTX (ctx, "readpos: %d, writepos %d", ctx->readpos, ctx->writepos); + if (ctx->readpos == ctx->writepos && !ctx->error) + { + /* No data available. */ + int eof = ctx->eof; + _gpgrt_lock_unlock (&ctx->mutex); + + if (pcookie->modeflags & O_NONBLOCK && ! eof) + { + _gpg_err_set_errno (EAGAIN); + return -1; + } + + TRACE_CTX (ctx, "waiting for data"); + WaitForSingleObject (ctx->have_data_ev, INFINITE); + TRACE_CTX (ctx, "data available"); + _gpgrt_lock_lock (&ctx->mutex); + } + + if (ctx->readpos == ctx->writepos || ctx->error) + { + _gpgrt_lock_unlock (&ctx->mutex); + ctx->eof_shortcut = 1; + if (ctx->eof) + return 0; + if (!ctx->error) + { + TRACE_CTX (ctx, "EOF but ctx->eof flag not set"); + return 0; + } + _gpg_err_set_errno (ctx->error_code); + return -1; + } + + nread = ctx->readpos < ctx->writepos + ? ctx->writepos - ctx->readpos + : READBUF_SIZE - ctx->readpos; + if (nread > count) + nread = count; + memcpy (buffer, ctx->buffer + ctx->readpos, nread); + ctx->readpos = (ctx->readpos + nread) % READBUF_SIZE; + if (ctx->readpos == ctx->writepos && !ctx->eof) + { + if (!ResetEvent (ctx->have_data_ev)) + { + TRACE_ERR (ctx, GetLastError (), "ResetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + } + if (!SetEvent (ctx->have_space_ev)) + { + TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed", + ctx->have_space_ev); + _gpgrt_lock_unlock (&ctx->mutex); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + _gpgrt_lock_unlock (&ctx->mutex); + + return nread; +} + + +/* The writer does use a simple buffering strategy so that we are + informed about write errors as soon as possible (i. e. with the the + next call to the write function. */ +static DWORD CALLBACK +writer (void *arg) +{ + struct writer_context_s *ctx = arg; + ssize_t nwritten; + + TRACE_CTX (ctx, "writer starting"); + + for (;;) + { + _gpgrt_lock_lock (&ctx->mutex); + if (ctx->stop_me && !ctx->nbytes) + { + _gpgrt_lock_unlock (&ctx->mutex); + break; + } + if (!ctx->nbytes) + { + if (!SetEvent (ctx->is_empty)) + TRACE_ERR (ctx, GetLastError (), "SetEvent failed"); + if (!ResetEvent (ctx->have_data)) + TRACE_ERR (ctx, GetLastError (), "ResetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + TRACE_CTX (ctx, "idle"); + WaitForSingleObject (ctx->have_data, INFINITE); + TRACE_CTX (ctx, "got data to write"); + _gpgrt_lock_lock (&ctx->mutex); + } + if (ctx->stop_me && !ctx->nbytes) + { + _gpgrt_lock_unlock (&ctx->mutex); + break; + } + _gpgrt_lock_unlock (&ctx->mutex); + + TRACE_CTX (ctx, "writing up to %d bytes", ctx->nbytes); + + nwritten = ctx->pcookie->next_functions.public.func_write + (ctx->pcookie->next_cookie, ctx->buffer, ctx->nbytes); + TRACE_CTX (ctx, "wrote %d bytes", nwritten); + if (nwritten < 1) + { + /* XXX */ + if (errno == ERROR_BUSY) + { + /* Probably stop_me is set now. */ + TRACE_CTX (ctx, "pipe busy (unblocked?)"); + continue; + } + + ctx->error_code = errno; + ctx->error = 1; + TRACE_ERR (ctx, ctx->error_code, "write error"); + break; + } + + _gpgrt_lock_lock (&ctx->mutex); + ctx->nbytes -= nwritten; + _gpgrt_lock_unlock (&ctx->mutex); + } + /* Indicate that we have an error. */ + if (!SetEvent (ctx->is_empty)) + TRACE_ERR (ctx, GetLastError (), "SetEvent failed"); + + TRACE_CTX (ctx, "waiting for close"); + WaitForSingleObject (ctx->close_ev, INFINITE); + + if (ctx->nbytes) + TRACE_CTX (ctx, "still %d bytes in buffer at close time", ctx->nbytes); + + CloseHandle (ctx->close_ev); + CloseHandle (ctx->have_data); + CloseHandle (ctx->is_empty); + CloseHandle (ctx->thread_hd); + _gpgrt_lock_destroy (&ctx->mutex); + _gpgrt_free (ctx); + + return 0; +} + + +static struct writer_context_s * +create_writer (estream_cookie_w32_pollable_t pcookie) +{ + struct writer_context_s *ctx; + SECURITY_ATTRIBUTES sec_attr; + DWORD tid; + + memset (&sec_attr, 0, sizeof sec_attr); + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = FALSE; + + ctx = calloc (1, sizeof *ctx); + if (!ctx) + { + return NULL; + } + + ctx->pcookie = pcookie; + + ctx->have_data = CreateEvent (&sec_attr, TRUE, FALSE, NULL); + if (ctx->have_data) + ctx->is_empty = CreateEvent (&sec_attr, TRUE, TRUE, NULL); + if (ctx->is_empty) + ctx->close_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL); + if (!ctx->have_data || !ctx->is_empty || !ctx->close_ev) + { + TRACE_ERR (ctx, GetLastError (), "CreateEvent failed"); + if (ctx->have_data) + CloseHandle (ctx->have_data); + if (ctx->is_empty) + CloseHandle (ctx->is_empty); + if (ctx->close_ev) + CloseHandle (ctx->close_ev); + _gpgrt_free (ctx); + return NULL; + } + + ctx->is_empty = set_synchronize (ctx->is_empty); + _gpgrt_lock_init (&ctx->mutex); + +#ifdef HAVE_W32CE_SYSTEM + ctx->thread_hd = CreateThread (&sec_attr, 64 * 1024, writer, ctx, + STACK_SIZE_PARAM_IS_A_RESERVATION, &tid); +#else + ctx->thread_hd = CreateThread (&sec_attr, 0, writer, ctx, 0, &tid ); +#endif + + if (!ctx->thread_hd) + { + TRACE_ERR (ctx, GetLastError (), "CreateThread failed"); + _gpgrt_lock_destroy (&ctx->mutex); + if (ctx->have_data) + CloseHandle (ctx->have_data); + if (ctx->is_empty) + CloseHandle (ctx->is_empty); + if (ctx->close_ev) + CloseHandle (ctx->close_ev); + _gpgrt_free (ctx); + return NULL; + } + else + { +#if 0 + /* We set the priority of the thread higher because we know + that it only runs for a short time. This greatly helps to + increase the performance of the I/O. */ + SetThreadPriority (ctx->thread_hd, get_desired_thread_priority ()); +#endif + } + + return ctx; +} + + +static void +destroy_writer (struct writer_context_s *ctx) +{ + _gpgrt_lock_lock (&ctx->mutex); + ctx->stop_me = 1; + if (ctx->have_data) + SetEvent (ctx->have_data); + _gpgrt_lock_unlock (&ctx->mutex); + + /* Give the writer a chance to flush the buffer. */ + WaitForSingleObject (ctx->is_empty, INFINITE); + +#ifdef HAVE_W32CE_SYSTEM + /* Scenario: We never create a full pipe, but already started + writing more than the pipe buffer. Then we need to unblock the + writer in the pipe driver to make our writer thread notice that + we want it to go away. */ + + if (!DeviceIoControl (ctx->file_hd, GPGCEDEV_IOCTL_UNBLOCK, + NULL, 0, NULL, 0, NULL, NULL)) + { + TRACE_ERR (ctx, GetLastError (), "unblock control call failed"); + } +#endif + + /* After setting this event CTX is void. */ + SetEvent (ctx->close_ev); +} + + +/* + * Write function for pollable objects. + */ +static gpgrt_ssize_t +func_w32_pollable_write (void *cookie, const void *buffer, size_t count) +{ + estream_cookie_w32_pollable_t pcookie = cookie; + struct writer_context_s *ctx; + + if (count == 0) + return 0; + + ctx = pcookie->writer; + if (ctx == NULL) + { + pcookie->writer = ctx = create_writer (pcookie); + if (!ctx) + return -1; + } + + _gpgrt_lock_lock (&ctx->mutex); + TRACE_CTX (ctx, "pollable write buffer: %p, count: %d, nbytes: %d", + buffer, count, ctx->nbytes); + if (!ctx->error && ctx->nbytes) + { + /* Bytes are pending for send. */ + + /* Reset the is_empty event. Better safe than sorry. */ + if (!ResetEvent (ctx->is_empty)) + { + TRACE_ERR (ctx, GetLastError (), "ResetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + _gpgrt_lock_unlock (&ctx->mutex); + + if (pcookie->modeflags & O_NONBLOCK) + { + TRACE_CTX (ctx, "would block"); + _gpg_err_set_errno (EAGAIN); + return -1; + } + + TRACE_CTX (ctx, "waiting for empty buffer"); + WaitForSingleObject (ctx->is_empty, INFINITE); + TRACE_CTX (ctx, "buffer is empty"); + _gpgrt_lock_lock (&ctx->mutex); + } + + if (ctx->error) + { + _gpgrt_lock_unlock (&ctx->mutex); + if (ctx->error_code == ERROR_NO_DATA) + _gpg_err_set_errno (EPIPE); + else + _gpg_err_set_errno (EIO); + return -1; + } + + /* If no error occurred, the number of bytes in the buffer must be + zero. */ + assert (!ctx->nbytes); + + if (count > WRITEBUF_SIZE) + count = WRITEBUF_SIZE; + memcpy (ctx->buffer, buffer, count); + ctx->nbytes = count; + + /* We have to reset the is_empty event early, because it is also + used by the select() implementation to probe the channel. */ + if (!ResetEvent (ctx->is_empty)) + { + TRACE_ERR (ctx, GetLastError (), "ResetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + if (!SetEvent (ctx->have_data)) + { + TRACE_ERR (ctx, GetLastError (), "SetEvent failed"); + _gpgrt_lock_unlock (&ctx->mutex); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + _gpgrt_lock_unlock (&ctx->mutex); + + return (int) count; +} + + +int +_gpgrt_w32_poll (gpgrt_poll_t *fds, size_t nfds, int timeout) +{ + HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS]; + int waitidx[MAXIMUM_WAIT_OBJECTS]; + int code; + int nwait; + int i; + int any; + int count; + +#if 0 + restart: +#endif + + TRACE ("poll on [ "); + any = 0; + nwait = 0; + count = 0; + for (i = 0; i < nfds; i++) + { + struct estream_cookie_w32_pollable *pcookie; + + if (fds[i].ignore) + continue; + + if (fds[i].stream->intern->kind != BACKEND_W32_POLLABLE) + { + /* This stream does not support polling. */ + fds[i].got_err = 1; + continue; + } + + pcookie = fds[i].stream->intern->cookie; + + if (fds[i].want_read || fds[i].want_write) + { + /* XXX: What if one wants read and write, is that supported? */ + if (fds[i].want_read) + { + struct reader_context_s *ctx = pcookie->reader; + TRACE ("%d/read ", i); + if (ctx == NULL) + { + pcookie->reader = ctx = create_reader (pcookie); + if (!ctx) + { + /* FIXME: Is the error code appropriate? */ + _gpg_err_set_errno (EBADF); + return -1; + } + } + + if (nwait >= DIM (waitbuf)) + { + TRACE ("oops ]: Too many objects for WFMO!\n"); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + waitidx[nwait] = i; + waitbuf[nwait++] = ctx->have_data_ev; + any = 1; + } + else if (fds[i].want_write) + { + struct writer_context_s *ctx = pcookie->writer; + TRACE ("%d/write ", i); + if (ctx == NULL) + { + pcookie->writer = ctx = create_writer (pcookie); + if (!ctx) + { + /* FIXME: Is the error code appropriate? */ + _gpg_err_set_errno (EBADF); + return -1; + } + } + + if (nwait >= DIM (waitbuf)) + { + TRACE ("oops ]: Too many objects for WFMO!"); + /* FIXME: Should translate the error code. */ + _gpg_err_set_errno (EIO); + return -1; + } + waitidx[nwait] = i; + waitbuf[nwait++] = ctx->is_empty; + any = 1; + } + } + } + TRACE ("]\n"); + if (!any) + return 0; + + code = WaitForMultipleObjects (nwait, waitbuf, 0, + timeout == -1 ? INFINITE : timeout); + if (code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait) + { + /* This WFMO is a really silly function: It does return either + the index of the signaled object or if 2 objects have been + signalled at the same time, the index of the object with the + lowest object is returned - so and how do we find out how + many objects have been signaled???. The only solution I can + imagine is to test each object starting with the returned + index individually - how dull. */ + any = 0; + for (i = code - WAIT_OBJECT_0; i < nwait; i++) + { + if (WaitForSingleObject (waitbuf[i], 0) == WAIT_OBJECT_0) + { + assert (waitidx[i] >=0 && waitidx[i] < nfds); + /* XXX: What if one wants read and write, is that + supported? */ + if (fds[waitidx[i]].want_read) + fds[waitidx[i]].got_read = 1; + else if (fds[waitidx[i]].want_write) + fds[waitidx[i]].got_write = 1; + any = 1; + count++; + } + } + if (!any) + { + TRACE ("no signaled objects found after WFMO\n"); + count = -1; + } + } + else if (code == WAIT_TIMEOUT) + TRACE ("WFMO timed out\n"); + else if (code == WAIT_FAILED) + { + TRACE_ERR (NULL, GetLastError (), "WFMO failed"); +#if 0 + if (GetLastError () == ERROR_INVALID_HANDLE) + { + int k; + int j = handle_to_fd (waitbuf[i]); + + TRACE ("WFMO invalid handle %d removed\n", j); + for (k = 0 ; k < nfds; k++) + { + if (fds[k].fd == j) + { + fds[k].want_read = fds[k].want_write = 0; + goto restart; + } + } + TRACE (" oops, or not???\n"); + } +#endif + count = -1; + } + else + { + TRACE ("WFMO returned %d\n", code); + count = -1; + } + + if (count > 0) + { + TRACE ("poll OK [ "); + for (i = 0; i < nfds; i++) + { + if (fds[i].ignore) + continue; + if (fds[i].got_read || fds[i].got_write) + TRACE ("%c%d ", fds[i].want_read ? 'r' : 'w', i); + } + TRACE ("]\n"); + } + + if (count < 0) + { + /* FIXME: Should determine a proper error code. */ + _gpg_err_set_errno (EIO); + } + + return count; +} + + + +/* + * Implementation of pollable I/O on Windows. + */ + +/* + * Constructor for pollable objects. + */ +int +_gpgrt_w32_pollable_create (void *_GPGRT__RESTRICT *_GPGRT__RESTRICT cookie, + unsigned int modeflags, + struct cookie_io_functions_s next_functions, + void *next_cookie) +{ + estream_cookie_w32_pollable_t pcookie; + int err; + + pcookie = _gpgrt_malloc (sizeof *pcookie); + if (!pcookie) + err = -1; + else + { + pcookie->modeflags = modeflags; + pcookie->next_functions = next_functions; + pcookie->next_cookie = next_cookie; + pcookie->reader = NULL; + pcookie->writer = NULL; + *cookie = pcookie; + err = 0; + } + + return err; +} + + +/* + * Seek function for pollable objects. + */ +static int +func_w32_pollable_seek (void *cookie, gpgrt_off_t *offset, int whence) +{ + estream_cookie_w32_pollable_t pcookie = cookie; + (void) pcookie; + (void) offset; + (void) whence; + /* XXX */ + _gpg_err_set_errno (EOPNOTSUPP); + return -1; +} + + +/* + * The IOCTL function for pollable objects. + */ +static int +func_w32_pollable_ioctl (void *cookie, int cmd, void *ptr, size_t *len) +{ + estream_cookie_w32_pollable_t pcookie = cookie; + cookie_ioctl_function_t func_ioctl = pcookie->next_functions.func_ioctl; + + if (cmd == COOKIE_IOCTL_NONBLOCK) + { + if (ptr) + pcookie->modeflags |= O_NONBLOCK; + else + pcookie->modeflags &= ~O_NONBLOCK; + return 0; + } + + if (func_ioctl) + return func_ioctl (pcookie->next_cookie, cmd, ptr, len); + + _gpg_err_set_errno (EOPNOTSUPP); + return -1; +} + + +/* + * The destroy function for pollable objects. + */ +static int +func_w32_pollable_destroy (void *cookie) +{ + estream_cookie_w32_pollable_t pcookie = cookie; + + if (cookie) + { + if (pcookie->reader) + destroy_reader (pcookie->reader); + if (pcookie->writer) + destroy_writer (pcookie->writer); + pcookie->next_functions.public.func_close (pcookie->next_cookie); + _gpgrt_free (pcookie); + } + return 0; +} + +/* + * Access object for the pollable functions. + */ +struct cookie_io_functions_s _gpgrt_functions_w32_pollable = + { + { + func_w32_pollable_read, + func_w32_pollable_write, + func_w32_pollable_seek, + func_w32_pollable_destroy, + }, + func_w32_pollable_ioctl, + }; diff --git a/tests/t-poll.c b/tests/t-poll.c index 026bb88..d39797a 100644 --- a/tests/t-poll.c +++ b/tests/t-poll.c @@ -191,14 +191,14 @@ create_pipe (estream_t *r_in, estream_t *r_out) show ("created pipe [%d, %d]\n", filedes[0], filedes[1]); - *r_in = es_fdopen (filedes[0], "r"); + *r_in = es_fdopen (filedes[0], "r,pollable"); if (!*r_in) { err = gpg_error_from_syserror (); die ("error creating a stream for a pipe: %s\n", gpg_strerror (err)); } - *r_out = es_fdopen (filedes[1], "w"); + *r_out = es_fdopen (filedes[1], "w,pollable"); if (!*r_out) { err = gpg_error_from_syserror (); |