# HG changeset patch # User Bruno Haible # Date 1249249925 -7200 # Node ID f63d201737ea1c16550864950c9aee0ff3ccd5b6 # Parent a43946bf11a94e21f65cb4a36f3d3d865df757b3 New module 'pipe-filter-gi'. diff --git a/ChangeLog b/ChangeLog --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2009-08-02 Paolo Bonzini + Bruno Haible + + New module 'pipe-filter-gi'. + * lib/pipe-filter-gi.c: New file. + * modules/pipe-filter-gi: New file. + 2009-08-02 Bruno Haible Paolo Bonzini diff --git a/lib/pipe-filter-gi.c b/lib/pipe-filter-gi.c new file mode 100644 --- /dev/null +++ b/lib/pipe-filter-gi.c @@ -0,0 +1,558 @@ +/* Filtering of data through a subprocess. + Copyright (C) 2001-2003, 2008-2009 Free Software Foundation, Inc. + Written by Paolo Bonzini , 2009, + and Bruno Haible , 2009. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . */ + +#include + +#include "pipe-filter.h" + +#include +#include +#include +#include +#include +#include +#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ +# include +#else +# include +# include +#endif + +#include "error.h" +#include "pipe.h" +#include "wait-process.h" +#include "xalloc.h" +#include "gettext.h" + +#define _(str) gettext (str) + +#include "pipe-filter-aux.h" + +struct pipe_filter_gi +{ + /* Arguments passed to pipe_filter_gi_create. */ + const char *progname; + bool null_stderr; + bool exit_on_error; + prepare_read_fn prepare_read; + done_read_fn done_read; + void *private_data; + + /* Management of the subprocess. */ + pid_t child; + int fd[2]; + bool exited; + int exitstatus; + + /* Status of the writer part. */ + volatile bool writer_terminated; + int writer_errno; + /* Status of the reader part. */ + volatile bool reader_terminated; + volatile int reader_errno; + +#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ + CRITICAL_SECTION lock; /* protects the volatile fields */ + HANDLE reader_thread_handle; +#else + struct sigaction orig_sigpipe_action; + fd_set readfds; /* All bits except fd[0] are always cleared. */ + fd_set writefds; /* All bits except fd[1] are always cleared. */ +#endif +}; + + +/* Platform dependent functions. */ + +/* Perform additional initializations. + Return 0 if successful, -1 upon failure. */ +static inline int filter_init (struct pipe_filter_gi *filter); + +/* Write count bytes starting at buf, while at the same time invoking the + read iterator (the functions prepare_read/done_read) when needed. */ +static void filter_loop (struct pipe_filter_gi *filter, + const char *wbuf, size_t count); + +/* Perform cleanup actions at the end. + finish_reading is true if there was no error, or false if some error + occurred already. */ +static inline void filter_cleanup (struct pipe_filter_gi *filter, + bool finish_reading); + + +#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ +/* Native Woe32 API. */ + +static unsigned int WINAPI +reader_thread_func (void *thread_arg) +{ + struct pipe_filter_gi *filter = (struct pipe_filter_gi *) thread_arg; + + for (;;) + { + size_t bufsize; + void *buf = filter->prepare_read (&bufsize, filter->private_data); + if (!(buf != NULL && bufsize > 0)) + /* prepare_read returned wrong values. */ + abort (); + { + ssize_t nread = + read (filter->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize); + EnterCriticalSection (&filter->lock); + /* If the writer already encountered an error, terminate. */ + if (filter->writer_terminated) + break; + if (nread < 0) + { + filter->reader_errno = errno; + break; + } + else if (nread > 0) + filter->done_read (buf, nread, filter->private_data); + else /* nread == 0 */ + break; + LeaveCriticalSection (&filter->lock); + } + } + + filter->reader_terminated = true; + LeaveCriticalSection (&filter->lock); + _endthreadex (0); /* calls ExitThread (0) */ + abort (); +} + +static inline int +filter_init (struct pipe_filter_gi *filter) +{ + InitializeCriticalSection (&filter->lock); + EnterCriticalSection (&filter->lock); + + filter->reader_thread_handle = + (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, filter, + 0, NULL); + + if (filter->reader_thread_handle == NULL) + { + if (filter->exit_on_error) + error (EXIT_FAILURE, 0, _("creation of reading thread failed")); + return -1; + } + else + return 0; +} + +static void +filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count) +{ + if (!filter->writer_terminated) + { + for (;;) + { + ssize_t nwritten; + + /* Allow the reader thread to continue. */ + LeaveCriticalSection (&filter->lock); + + nwritten = + write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count); + + /* Get the lock back from the reader thread. */ + EnterCriticalSection (&filter->lock); + + if (nwritten < 0) + { + /* Don't assume that the gnulib modules 'write' and 'sigpipe' are + used. */ + if (GetLastError () == ERROR_NO_DATA) + errno = EPIPE; + filter->writer_errno = errno; + filter->writer_terminated = true; + break; + } + else if (nwritten > 0) + { + count -= nwritten; + if (count == 0) + break; + wbuf += nwritten; + } + else /* nwritten == 0 */ + { + filter->writer_terminated = true; + break; + } + } + } +} + +static inline void +filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading) +{ + if (finish_reading) + { + LeaveCriticalSection (&filter->lock); + WaitForSingleObject (filter->reader_thread_handle, INFINITE); + } + else + TerminateThread (filter->reader_thread_handle, 1); + + CloseHandle (filter->reader_thread_handle); + DeleteCriticalSection (&filter->lock); +} + +#else +/* Unix API. */ + +static inline int +filter_init (struct pipe_filter_gi *filter) +{ +#if !((defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__) + /* When we write to the child process and it has just terminated, + we don't want to die from a SIGPIPE signal. So set the SIGPIPE + handler to SIG_IGN, and handle EPIPE error codes in write(). */ + { + struct sigaction sigpipe_action; + + sigpipe_action.sa_handler = SIG_IGN; + sigpipe_action.sa_flags = 0; + sigemptyset (&sigpipe_action.sa_mask); + if (sigaction (SIGPIPE, &sigpipe_action, &filter->orig_sigpipe_action) < 0) + abort (); + } +#endif + + /* Enable non-blocking I/O. This permits the read() and write() calls + to return -1/EAGAIN without blocking; this is important for polling + if HAVE_SELECT is not defined. It also permits the read() and write() + calls to return after partial reads/writes; this is important if + HAVE_SELECT is defined, because select() only says that some data + can be read or written, not how many. Without non-blocking I/O, + Linux 2.2.17 and BSD systems prefer to block instead of returning + with partial results. */ + { + int fcntl_flags; + + if ((fcntl_flags = fcntl (filter->fd[1], F_GETFL, 0)) < 0 + || fcntl (filter->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0 + || (fcntl_flags = fcntl (filter->fd[0], F_GETFL, 0)) < 0 + || fcntl (filter->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0) + { + if (filter->exit_on_error) + error (EXIT_FAILURE, errno, + _("cannot set up nonblocking I/O to %s subprocess"), + filter->progname); + return -1; + } + } + + FD_ZERO (&filter->readfds); + FD_ZERO (&filter->writefds); + + return 0; +} + +static void +filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count) +{ + /* This function is used in two situations: + - in order to write some data to the subprocess + [done_writing = false], + - in order to read the remaining data after everything was written + [done_writing = true]. In this case buf is NULL and count is + ignored. */ + bool done_writing = (wbuf == NULL); + + if (!done_writing) + { + if (filter->writer_terminated || filter->reader_terminated) + /* pipe_filter_gi_write was called when it should not be. */ + abort (); + } + else + { + if (filter->reader_terminated) + return; + } + + /* Loop, trying to write the given buffer or reading, whichever is + possible. */ + for (;;) + { + /* Here filter->writer_terminated is false. When it becomes true, this + loop is terminated. */ + /* Whereas filter->reader_terminated is initially false but may become + true during this loop. */ + /* Here, if !done_writing, count > 0. When count becomes 0, this loop + is terminated. */ + /* Here, if done_writing, filter->reader_terminated is false. When + filter->reader_terminated becomes true, this loop is terminated. */ +# if HAVE_SELECT + int n; + + /* See whether reading or writing is possible. */ + n = 1; + if (!filter->reader_terminated) + { + FD_SET (filter->fd[0], &filter->readfds); + n = filter->fd[0] + 1; + } + if (!done_writing) + { + FD_SET (filter->fd[1], &filter->writefds); + if (n <= filter->fd[1]) + n = filter->fd[1] + 1; + } + n = select (n, + (!filter->reader_terminated ? &filter->readfds : NULL), + (!done_writing ? &filter->writefds : NULL), + NULL, NULL); + + if (n < 0) + { + if (filter->exit_on_error) + error (EXIT_FAILURE, errno, + _("communication with %s subprocess failed"), + filter->progname); + filter->writer_errno = errno; + filter->writer_terminated = true; + break; + } + + if (!done_writing && FD_ISSET (filter->fd[1], &filter->writefds)) + goto try_write; + if (!filter->reader_terminated + && FD_ISSET (filter->fd[0], &filter->readfds)) + goto try_read; + /* How could select() return if none of the two descriptors is ready? */ + abort (); +# endif + + /* Attempt to write. */ +# if HAVE_SELECT + try_write: +# endif + if (!done_writing) + { + ssize_t nwritten = + write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count); + if (nwritten < 0) + { + if (!IS_EAGAIN (errno)) + { + if (filter->exit_on_error) + error (EXIT_FAILURE, errno, + _("write to %s subprocess failed"), + filter->progname); + filter->writer_errno = errno; + filter->writer_terminated = true; + break; + } + } + else if (nwritten > 0) + { + count -= nwritten; + if (count == 0) + break; + wbuf += nwritten; + } + } +# if HAVE_SELECT + continue; +# endif + + /* Attempt to read. */ +# if HAVE_SELECT + try_read: +# endif + if (!filter->reader_terminated) + { + size_t bufsize; + void *buf = filter->prepare_read (&bufsize, filter->private_data); + if (!(buf != NULL && bufsize > 0)) + /* prepare_read returned wrong values. */ + abort (); + { + ssize_t nread = + read (filter->fd[0], buf, + bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize); + if (nread < 0) + { + if (!IS_EAGAIN (errno)) + { + if (filter->exit_on_error) + error (EXIT_FAILURE, errno, + _("read from %s subprocess failed"), + filter->progname); + filter->reader_errno = errno; + filter->reader_terminated = true; + break; + } + } + else if (nread > 0) + filter->done_read (buf, nread, filter->private_data); + else /* nread == 0 */ + { + filter->reader_terminated = true; + if (done_writing) + break; + } + } + } +# if HAVE_SELECT + continue; +# endif + } +} + +static void +filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading) +{ + if (finish_reading) + /* A select loop, with done_writing = true. */ + filter_loop (filter, NULL, 0); + + if (sigaction (SIGPIPE, &filter->orig_sigpipe_action, NULL) < 0) + abort (); +} + +#endif + + +/* Terminate the child process. Do nothing if it already exited. */ +static void +filter_terminate (struct pipe_filter_gi *filter) +{ + if (!filter->exited) + { + /* Tell the child there is nothing more the parent will send. */ + close (filter->fd[1]); + filter_cleanup (filter, !filter->reader_terminated); + close (filter->fd[0]); + filter->exitstatus = + wait_subprocess (filter->child, filter->progname, true, + filter->null_stderr, true, filter->exit_on_error, + NULL); + if (filter->exitstatus != 0 && filter->exit_on_error) + error (EXIT_FAILURE, 0, + _("subprocess %s terminated with exit code %d"), + filter->progname, filter->exitstatus); + filter->exited = true; + } +} + +/* After filter_terminate: + Return 0 upon success, or (only if exit_on_error is false): + - -1 with errno set upon failure, + - the positive exit code of the subprocess if that failed. */ +static inline int +filter_retcode (struct pipe_filter_gi *filter) +{ + if (filter->writer_errno != 0) + { + errno = filter->writer_errno; + return -1; + } + else if (filter->reader_errno != 0) + { + errno = filter->reader_errno; + return -1; + } + else + return filter->exitstatus; +} + +struct pipe_filter_gi * +pipe_filter_gi_create (const char *progname, + const char *prog_path, const char **prog_argv, + bool null_stderr, bool exit_on_error, + prepare_read_fn prepare_read, + done_read_fn done_read, + void *private_data) +{ + struct pipe_filter_gi *filter; + + filter = + (struct pipe_filter_gi *) xmalloc (sizeof (struct pipe_filter_gi)); + + /* Open a bidirectional pipe to a subprocess. */ + filter->child = create_pipe_bidi (progname, prog_path, (char **) prog_argv, + null_stderr, true, exit_on_error, + filter->fd); + filter->progname = progname; + filter->null_stderr = null_stderr; + filter->exit_on_error = exit_on_error; + filter->prepare_read = prepare_read; + filter->done_read = done_read; + filter->private_data = private_data; + filter->exited = false; + filter->exitstatus = 0; + filter->writer_terminated = false; + filter->writer_errno = 0; + filter->reader_terminated = false; + filter->reader_errno = 0; + + if (filter->child == -1) + { + /* Child process could not be created. + Arrange for filter_retcode (filter) to be the current errno. */ + filter->writer_errno = errno; + filter->writer_terminated = true; + filter->exited = true; + } + else if (filter_init (filter) < 0) + filter_terminate (filter); + + return filter; +} + +int +pipe_filter_gi_write (struct pipe_filter_gi *filter, + const void *buf, size_t size) +{ + if (buf == NULL) + /* Invalid argument. */ + abort (); + + if (filter->exited) + return filter_retcode (filter); + + if (size > 0) + { + filter_loop (filter, buf, size); + if (filter->writer_terminated || filter->reader_terminated) + { + filter_terminate (filter); + return filter_retcode (filter); + } + } + return 0; +} + +int +pipe_filter_gi_close (struct pipe_filter_gi *filter) +{ + int ret; + int saved_errno; + + filter_terminate (filter); + ret = filter_retcode (filter); + saved_errno = errno; + free (filter); + errno = saved_errno; + return ret; +} diff --git a/modules/pipe-filter-gi b/modules/pipe-filter-gi new file mode 100644 --- /dev/null +++ b/modules/pipe-filter-gi @@ -0,0 +1,34 @@ +Description: +Filtering of data through a subprocess. + +Files: +lib/pipe-filter.h +lib/pipe-filter-gi.c +lib/pipe-filter-aux.h + +Depends-on: +pipe +wait-process +error +exit +gettext-h +stdbool +stdint +sys_select +unistd + +configure.ac: +AC_REQUIRE([AC_C_INLINE]) +AC_CHECK_FUNCS([select]) + +Makefile.am: +lib_SOURCES += pipe-filter-gi.c + +Include: +"pipe-filter.h" + +License: +GPL + +Maintainer: +Paolo Bonzini, Bruno Haible