summaryrefslogtreecommitdiff
path: root/subversion/libsvn_ra_svn/streams.c
diff options
context:
space:
mode:
Diffstat (limited to 'subversion/libsvn_ra_svn/streams.c')
-rw-r--r--subversion/libsvn_ra_svn/streams.c149
1 files changed, 63 insertions, 86 deletions
diff --git a/subversion/libsvn_ra_svn/streams.c b/subversion/libsvn_ra_svn/streams.c
index 4ae93d5..3ad792b 100644
--- a/subversion/libsvn_ra_svn/streams.c
+++ b/subversion/libsvn_ra_svn/streams.c
@@ -33,12 +33,14 @@
#include "svn_io.h"
#include "svn_private_config.h"
+#include "private/svn_io_private.h"
+
#include "ra_svn.h"
struct svn_ra_svn__stream_st {
- svn_stream_t *stream;
- void *baton;
- ra_svn_pending_fn_t pending_fn;
+ svn_stream_t *in_stream;
+ svn_stream_t *out_stream;
+ void *timeout_baton;
ra_svn_timeout_fn_t timeout_fn;
};
@@ -47,11 +49,6 @@ typedef struct sock_baton_t {
apr_pool_t *pool;
} sock_baton_t;
-typedef struct file_baton_t {
- apr_file_t *in_file;
- apr_file_t *out_file;
- apr_pool_t *pool;
-} file_baton_t;
/* Returns TRUE if PFD has pending data, FALSE otherwise. */
static svn_boolean_t pending(apr_pollfd_t *pfd, apr_pool_t *pool)
@@ -67,65 +64,34 @@ static svn_boolean_t pending(apr_pollfd_t *pfd, apr_pool_t *pool)
/* Functions to implement a file backed svn_ra_svn__stream_t. */
-/* Implements svn_read_fn_t */
-static svn_error_t *
-file_read_cb(void *baton, char *buffer, apr_size_t *len)
-{
- file_baton_t *b = baton;
- apr_status_t status = apr_file_read(b->in_file, buffer, len);
-
- if (status && !APR_STATUS_IS_EOF(status))
- return svn_error_wrap_apr(status, _("Can't read from connection"));
- if (*len == 0)
- return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
- return SVN_NO_ERROR;
-}
-
-/* Implements svn_write_fn_t */
-static svn_error_t *
-file_write_cb(void *baton, const char *buffer, apr_size_t *len)
-{
- file_baton_t *b = baton;
- apr_status_t status = apr_file_write(b->out_file, buffer, len);
- if (status)
- return svn_error_wrap_apr(status, _("Can't write to connection"));
- return SVN_NO_ERROR;
-}
-
/* Implements ra_svn_timeout_fn_t */
static void
file_timeout_cb(void *baton, apr_interval_time_t interval)
{
- file_baton_t *b = baton;
- apr_file_pipe_timeout_set(b->out_file, interval);
-}
-
-/* Implements ra_svn_pending_fn_t */
-static svn_boolean_t
-file_pending_cb(void *baton)
-{
- file_baton_t *b = baton;
- apr_pollfd_t pfd;
-
- pfd.desc_type = APR_POLL_FILE;
- pfd.desc.f = b->in_file;
+ apr_file_t *f = baton;
- return pending(&pfd, b->pool);
+ if (f)
+ apr_file_pipe_timeout_set(f, interval);
}
svn_ra_svn__stream_t *
-svn_ra_svn__stream_from_files(apr_file_t *in_file,
- apr_file_t *out_file,
- apr_pool_t *pool)
+svn_ra_svn__stream_from_streams(svn_stream_t *in_stream,
+ svn_stream_t *out_stream,
+ apr_pool_t *pool)
{
- file_baton_t *b = apr_palloc(pool, sizeof(*b));
+ apr_file_t *file;
+
+ /* If out_stream is backed by an apr_file (e.g. an PIPE) we
+ provide a working callback, otherwise the callback ignores
+ the timeout.
- b->in_file = in_file;
- b->out_file = out_file;
- b->pool = pool;
+ The callback is used to make the write non-blocking on
+ some error scenarios. ### This (legacy) usage
+ breaks the stream promise */
+ file = svn_stream__aprfile(out_stream);
- return svn_ra_svn__stream_create(b, file_read_cb, file_write_cb,
- file_timeout_cb, file_pending_cb,
+ return svn_ra_svn__stream_create(in_stream, out_stream,
+ file, file_timeout_cb,
pool);
}
@@ -155,8 +121,6 @@ sock_read_cb(void *baton, char *buffer, apr_size_t *len)
if (status && !APR_STATUS_IS_EOF(status))
return svn_error_wrap_apr(status, _("Can't read from connection"));
- if (*len == 0)
- return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
return SVN_NO_ERROR;
}
@@ -179,9 +143,10 @@ sock_timeout_cb(void *baton, apr_interval_time_t interval)
apr_socket_timeout_set(b->sock, interval);
}
-/* Implements ra_svn_pending_fn_t */
-static svn_boolean_t
-sock_pending_cb(void *baton)
+/* Implements svn_stream_data_available_fn_t */
+static svn_error_t *
+sock_pending_cb(void *baton,
+ svn_boolean_t *data_available)
{
sock_baton_t *b = baton;
apr_pollfd_t pfd;
@@ -189,41 +154,45 @@ sock_pending_cb(void *baton)
pfd.desc_type = APR_POLL_SOCKET;
pfd.desc.s = b->sock;
- return pending(&pfd, b->pool);
+ *data_available = pending(&pfd, b->pool);
+
+ svn_pool_clear(b->pool);
+
+ return SVN_NO_ERROR;
}
svn_ra_svn__stream_t *
svn_ra_svn__stream_from_sock(apr_socket_t *sock,
- apr_pool_t *pool)
+ apr_pool_t *result_pool)
{
- sock_baton_t *b = apr_palloc(pool, sizeof(*b));
+ sock_baton_t *b = apr_palloc(result_pool, sizeof(*b));
+ svn_stream_t *sock_stream;
b->sock = sock;
- b->pool = pool;
+ b->pool = svn_pool_create(result_pool);
- return svn_ra_svn__stream_create(b, sock_read_cb, sock_write_cb,
- sock_timeout_cb, sock_pending_cb,
- pool);
+ sock_stream = svn_stream_create(b, result_pool);
+
+ svn_stream_set_read2(sock_stream, sock_read_cb, NULL /* use default */);
+ svn_stream_set_write(sock_stream, sock_write_cb);
+ svn_stream_set_data_available(sock_stream, sock_pending_cb);
+
+ return svn_ra_svn__stream_create(sock_stream, sock_stream,
+ b, sock_timeout_cb, result_pool);
}
svn_ra_svn__stream_t *
-svn_ra_svn__stream_create(void *baton,
- svn_read_fn_t read_cb,
- svn_write_fn_t write_cb,
+svn_ra_svn__stream_create(svn_stream_t *in_stream,
+ svn_stream_t *out_stream,
+ void *timeout_baton,
ra_svn_timeout_fn_t timeout_cb,
- ra_svn_pending_fn_t pending_cb,
apr_pool_t *pool)
{
svn_ra_svn__stream_t *s = apr_palloc(pool, sizeof(*s));
- s->stream = svn_stream_empty(pool);
- svn_stream_set_baton(s->stream, baton);
- if (read_cb)
- svn_stream_set_read(s->stream, read_cb);
- if (write_cb)
- svn_stream_set_write(s->stream, write_cb);
- s->baton = baton;
+ s->in_stream = in_stream;
+ s->out_stream = out_stream;
+ s->timeout_baton = timeout_baton;
s->timeout_fn = timeout_cb;
- s->pending_fn = pending_cb;
return s;
}
@@ -231,25 +200,33 @@ svn_error_t *
svn_ra_svn__stream_write(svn_ra_svn__stream_t *stream,
const char *data, apr_size_t *len)
{
- return svn_stream_write(stream->stream, data, len);
+ return svn_error_trace(svn_stream_write(stream->out_stream, data, len));
}
svn_error_t *
svn_ra_svn__stream_read(svn_ra_svn__stream_t *stream, char *data,
apr_size_t *len)
{
- return svn_stream_read(stream->stream, data, len);
+ SVN_ERR(svn_stream_read2(stream->in_stream, data, len));
+
+ if (*len == 0)
+ return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
+
+ return SVN_NO_ERROR;
}
void
svn_ra_svn__stream_timeout(svn_ra_svn__stream_t *stream,
apr_interval_time_t interval)
{
- stream->timeout_fn(stream->baton, interval);
+ stream->timeout_fn(stream->timeout_baton, interval);
}
-svn_boolean_t
-svn_ra_svn__stream_pending(svn_ra_svn__stream_t *stream)
+svn_error_t *
+svn_ra_svn__stream_data_available(svn_ra_svn__stream_t *stream,
+ svn_boolean_t *data_available)
{
- return stream->pending_fn(stream->baton);
+ return svn_error_trace(
+ svn_stream_data_available(stream->in_stream,
+ data_available));
}