diff options
author | Donovan Baarda <abo@minkirri.apana.org.au> | 2021-09-16 11:37:18 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-16 11:37:18 +1000 |
commit | d202e4eef208165aa7768ad15236e1cd1d6fae0c (patch) | |
tree | 7fe3f27a77873051d571b21880254fda9ab7e2cd | |
parent | b8609e9fb1ca888c852f6a7cf5b96690098f94e1 (diff) | |
parent | c50bd630dde1c62c56cac0e917570a8c13173e07 (diff) | |
download | librsync-d202e4eef208165aa7768ad15236e1cd1d6fae0c.tar.gz |
Merge pull request #234 from dbaarda/dev/scoop1
Make delta directly process the input stream if it has enough data.
-rw-r--r-- | NEWS.md | 13 | ||||
-rw-r--r-- | src/buf.c | 55 | ||||
-rw-r--r-- | src/delta.c | 130 | ||||
-rw-r--r-- | src/job.c | 7 | ||||
-rw-r--r-- | src/job.h | 31 | ||||
-rw-r--r-- | src/librsync.h | 16 | ||||
-rw-r--r-- | src/mksum.c | 2 | ||||
-rw-r--r-- | src/netint.c | 2 | ||||
-rw-r--r-- | src/patch.c | 2 | ||||
-rw-r--r-- | src/readsums.c | 2 | ||||
-rw-r--r-- | src/scoop.c | 45 | ||||
-rw-r--r-- | src/scoop.h (renamed from src/stream.h) | 117 | ||||
-rw-r--r-- | src/tube.c | 90 | ||||
-rw-r--r-- | src/whole.c | 11 |
14 files changed, 292 insertions, 231 deletions
@@ -4,6 +4,19 @@ NOT RELEASED YET + * Make delta directly process the input stream if it has enough data. Delta + operations will only accumulate data into the internal scoop buffer if the + input buffer is too small, otherwise it will process the input directly. + This makes delta calculations 5%~15% faster by avoiding extra data copying. + (dbaarda, https://github.com/librsync/librsync/pull/234) + + * Add .gitignore for `.cmake` created by LSP on Windows. (sourcefrog, + https://github.com/librsync/librsync/pull/232) + + * Improve documentation so that Doxygen generates more complete documentation + with diagrams, renders better, and is more navigable as markdown docs on + GitHub. (dbaarda, https://github.com/librsync/librsync/pull/230) + * Add github action and make targets for clang-tidy and iwyu. Added `clang-tidy` and `iwyu` make targets for checking code and includes, and `iwyu-fix` for fixing includes. Added `lint.yml` GitHub action to run these @@ -65,44 +65,34 @@ rs_result rs_infilebuf_fill(rs_job_t *job, rs_buffers_t *buf, void *opaque) rs_filebuf_t *fb = (rs_filebuf_t *)opaque; FILE *f = fb->f; - /* This is only allowed if either the buf has no input buffer yet, or that - buffer could possibly be BUF. */ - if (buf->next_in != NULL) { - assert(buf->avail_in <= fb->buf_len); + /* If buf has data, it must be in the buffer. */ + if (buf->avail_in) { assert(buf->next_in >= fb->buf); - assert(buf->next_in <= fb->buf + fb->buf_len); - } else { - assert(buf->avail_in == 0); + assert(buf->next_in + buf->avail_in <= fb->buf + fb->buf_len); } - - if (buf->eof_in || (buf->eof_in = feof(f))) { - rs_trace("seen end of file on input"); + if (buf->eof_in) { return RS_DONE; - } - - if (buf->avail_in) - /* Still some data remaining. Perhaps we should read anyhow? */ + } else if (buf->avail_in > fb->buf_len / 2) { + /* Buf is already full enough, do nothing. */ return RS_DONE; - - len = fread(fb->buf, 1, fb->buf_len, f); + } else if (buf->avail_in) { + /* Some leftover tail data, move it to the front of the buffer. */ + rs_trace("moving buffer " FMT_SIZE " bytes to reuse " FMT_SIZE " bytes", + buf->avail_in, (size_t)(buf->next_in - fb->buf)); + memmove(fb->buf, buf->next_in, buf->avail_in); + } + buf->next_in = fb->buf; + len = fread(fb->buf + buf->avail_in, 1, fb->buf_len - buf->avail_in, f); if (len == 0) { - /* This will happen if file size is a multiple of input block len */ - if (feof(f)) { + if ((buf->eof_in = feof(f))) { rs_trace("seen end of file on input"); - buf->eof_in = 1; return RS_DONE; - } - if (ferror(f)) { - rs_error("error filling buf from file: %s", strerror(errno)); - return RS_IO_ERROR; } else { - rs_error("no error bit, but got " FMT_SIZE - " return when trying to read", len); + rs_error("error filling buf from file: %s", strerror(errno)); return RS_IO_ERROR; } } - buf->avail_in = len; - buf->next_in = fb->buf; + buf->avail_in += len; job->stats.in_bytes += len; return RS_DONE; } @@ -114,18 +104,15 @@ rs_result rs_outfilebuf_drain(rs_job_t *job, rs_buffers_t *buf, void *opaque) rs_filebuf_t *fb = (rs_filebuf_t *)opaque; FILE *f = fb->f; - /* This is only allowed if either the buf has no output buffer yet, or that - buffer could possibly be BUF. */ - if (buf->next_out == NULL) { + /* If next_out is NULL, we haven't pointed it at fb->buf yet. */ + if (!buf->next_out) { assert(buf->avail_out == 0); buf->next_out = fb->buf; buf->avail_out = fb->buf_len; - return RS_DONE; } - - assert(buf->avail_out <= fb->buf_len); + /* The buf output buffer must be at the end of fb->buf. */ assert(buf->next_out >= fb->buf); - assert(buf->next_out <= fb->buf + fb->buf_len); + assert(buf->next_out + buf->avail_out == fb->buf + fb->buf_len); size_t present = buf->next_out - fb->buf; if (present > 0) { diff --git a/src/delta.c b/src/delta.c index ed09ca7..2d7fdbc 100644 --- a/src/delta.c +++ b/src/delta.c @@ -63,9 +63,9 @@ * this happens. * * In pysync a 'last' attribute is used to hold the last miss or match for - * extending if possible. In this code, basis_len and scoop_pos are used - * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0 - * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie, + * extending if possible. In this code, basis_len and scan_pos are used instead + * of 'last'. When basis_len > 0, last is a match. When basis_len = 0 and + * scan_pos is > 0, last is a miss. When both are 0, last is None (ie, * nothing). * * Pysync is also slightly different in that a 'flush' method is available to @@ -82,10 +82,10 @@ * data consumes it off the input scoop and outputs the processed miss data * into the tube. * - * The scoop contains all data yet to be processed. The scoop_pos is an index + * The scoop contains all data yet to be processed. The scan_pos is an index * into the scoop that indicates the point scanned to. As data is scanned, - * scoop_pos is incremented. As data is processed, it is removed from the scoop - * and scoop_pos adjusted. Everything gets complicated because the tube can + * scan_pos is incremented. As data is processed, it is removed from the scoop + * and scan_pos adjusted. Everything gets complicated because the tube can * block. When the tube is blocked, no data can be processed. */ #include <assert.h> @@ -94,14 +94,17 @@ #include "job.h" #include "sumset.h" #include "checksum.h" -#include "stream.h" +#include "scoop.h" #include "emit.h" #include "trace.h" +/** Max length of a miss is 64K including 3 command bytes. */ +#define MAX_MISS_LEN (MAX_DELTA_CMD - 3) + static rs_result rs_delta_s_scan(rs_job_t *job); static rs_result rs_delta_s_flush(rs_job_t *job); static rs_result rs_delta_s_end(rs_job_t *job); -static inline void rs_getinput(rs_job_t *job); +static inline rs_result rs_getinput(rs_job_t *job, size_t block_len); static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len); static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, @@ -123,13 +126,14 @@ static rs_result rs_delta_s_scan(rs_job_t *job) rs_result result; rs_job_check(job); - /* read the input into the scoop */ - rs_getinput(job); /* output any pending output from the tube */ - result = rs_tube_catchup(job); + if ((result = rs_tube_catchup(job)) != RS_DONE) + return result; + /* read the input into the scoop */ + if ((result = rs_getinput(job, block_len)) != RS_DONE) + return result; /* while output is not blocked and there is a block of data */ - while ((result == RS_DONE) - && ((job->scoop_pos + block_len) < job->scoop_avail)) { + while ((result == RS_DONE) && ((job->scan_pos + block_len) < job->scan_len)) { /* check if this block matches */ if (rs_findmatch(job, &match_pos, &match_len)) { /* append the match and reset the weak_sum */ @@ -137,8 +141,8 @@ static rs_result rs_delta_s_scan(rs_job_t *job) weaksum_reset(&job->weak_sum); } else { /* rotate the weak_sum and append the miss byte */ - weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos], - job->scoop_next[job->scoop_pos + block_len]); + weaksum_rotate(&job->weak_sum, job->scan_buf[job->scan_pos], + job->scan_buf[job->scan_pos + block_len]); result = rs_appendmiss(job, 1); } } @@ -158,17 +162,20 @@ static rs_result rs_delta_s_scan(rs_job_t *job) static rs_result rs_delta_s_flush(rs_job_t *job) { + const size_t block_len = job->signature->block_len; rs_long_t match_pos; size_t match_len; rs_result result; rs_job_check(job); + /* output any pending output from the tube */ + if ((result = rs_tube_catchup(job)) != RS_DONE) + return result; /* read the input into the scoop */ - rs_getinput(job); - /* output any pending output */ - result = rs_tube_catchup(job); + if ((result = rs_getinput(job, block_len)) != RS_DONE) + return result; /* while output is not blocked and there is any remaining data */ - while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) { + while ((result == RS_DONE) && (job->scan_pos < job->scan_len)) { /* check if this block matches */ if (rs_findmatch(job, &match_pos, &match_len)) { /* append the match and reset the weak_sum */ @@ -176,7 +183,7 @@ static rs_result rs_delta_s_flush(rs_job_t *job) weaksum_reset(&job->weak_sum); } else { /* rollout from weak_sum and append the miss byte */ - weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]); + weaksum_rollout(&job->weak_sum, job->scan_buf[job->scan_pos]); rs_trace("block reduced to " FMT_SIZE "", weaksum_count(&job->weak_sum)); result = rs_appendmiss(job, 1); @@ -199,25 +206,25 @@ static rs_result rs_delta_s_end(rs_job_t *job) return RS_DONE; } -static inline void rs_getinput(rs_job_t *job) +static inline rs_result rs_getinput(rs_job_t *job, size_t block_len) { - size_t len; + size_t min_len = block_len + MAX_DELTA_CMD; - len = rs_scoop_total_avail(job); - if (job->scoop_avail < len) { - rs_scoop_input(job, len); - } + job->scan_len = rs_scoop_avail(job); + if (job->scan_len < min_len && !job->stream->eof_in) + job->scan_len = min_len; + return rs_scoop_readahead(job, job->scan_len, (void **)&job->scan_buf); } -/** find a match at scoop_pos, returning the match_pos and match_len. +/** find a match at scan_pos, returning the match_pos and match_len. * * Note that this will calculate weak_sum if required. It will also determine * the match_len. * * This routine could be modified to do xdelta style matches that would extend * matches past block boundaries by matching backwards and forwards beyond the - * block boundaries. Extending backwards would require decrementing scoop_pos - * as appropriate. */ + * block boundaries. Extending backwards would require decrementing scan_pos as + * appropriate. */ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len) { @@ -226,12 +233,12 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, /* calculate the weak_sum if we don't have one */ if (weaksum_count(&job->weak_sum) == 0) { /* set match_len to min(block_len, scan_avail) */ - *match_len = job->scoop_avail - job->scoop_pos; + *match_len = job->scan_len - job->scan_pos; if (*match_len > block_len) { *match_len = block_len; } /* Update the weak_sum */ - weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos, + weaksum_update(&job->weak_sum, job->scan_buf + job->scan_pos, *match_len); rs_trace("calculate weak sum from scratch length " FMT_SIZE "", weaksum_count(&job->weak_sum)); @@ -241,7 +248,7 @@ static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, } *match_pos = rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum), - job->scoop_next + job->scoop_pos, *match_len); + job->scan_buf + job->scan_pos, *match_len); return *match_pos != -1; } @@ -262,8 +269,8 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, job->basis_pos = match_pos; job->basis_len = match_len; } - /* increment scoop_pos to point at next unscanned data */ - job->scoop_pos += match_len; + /* increment scan_pos to point at next unscanned data */ + job->scan_pos += match_len; /* we can only process from the scoop if output is not blocked */ if (result == RS_DONE) { /* process the match data off the scoop */ @@ -279,15 +286,14 @@ static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, * in memory. */ static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len) { - const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */ rs_result result = RS_DONE; - /* If last was a match, or max_miss misses, appendflush it. */ - if (job->basis_len || (job->scoop_pos >= max_miss)) { + /* If last was a match, or MAX_MISS_LEN misses, appendflush it. */ + if (job->basis_len || (job->scan_pos >= MAX_MISS_LEN)) { result = rs_appendflush(job); } - /* increment scoop_pos */ - job->scoop_pos += miss_len; + /* increment scan_pos */ + job->scan_pos += miss_len; return result; } @@ -302,9 +308,9 @@ static inline rs_result rs_appendflush(rs_job_t *job) job->basis_len = 0; return rs_processmatch(job); /* else if last is a miss, emit and process it */ - } else if (job->scoop_pos) { - rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos); - rs_emit_literal_cmd(job, (int)job->scoop_pos); + } else if (job->scan_pos) { + rs_trace("got " FMT_SIZE " bytes of literal data", job->scan_pos); + rs_emit_literal_cmd(job, (int)job->scan_pos); return rs_processmiss(job); } /* otherwise, nothing to flush so we are done */ @@ -313,40 +319,45 @@ static inline rs_result rs_appendflush(rs_job_t *job) /** Process matching data in the scoop. * - * The scoop contains match data at scoop_next of length scoop_pos. This - * function processes that match data, returning RS_DONE if it completes, or - * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to - * still point at the next unscanned data. + * The scoop contains match data at scan_buf of length scan_pos. This function + * processes that match data, returning RS_DONE if it completes, or RS_BLOCKED + * if it gets blocked. After it completes scan_pos is reset to still point at + * the next unscanned data. * * This function currently just removes data from the scoop and adjusts - * scoop_pos appropriately. In the future this could be used for something like + * scan_pos appropriately. In the future this could be used for something like * context compressing of miss data. Note that it also calls rs_tube_catchup to * output any pending output. */ static inline rs_result rs_processmatch(rs_job_t *job) { - job->scoop_avail -= job->scoop_pos; - job->scoop_next += job->scoop_pos; - job->scoop_pos = 0; + assert(job->copy_len == 0); + rs_scoop_advance(job, job->scan_pos); + job->scan_buf += job->scan_pos; + job->scan_len -= job->scan_pos; + job->scan_pos = 0; return rs_tube_catchup(job); } /** Process miss data in the scoop. * - * The scoop contains miss data at scoop_next of length scoop_pos. This - * function processes that miss data, returning RS_DONE if it completes, or - * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to - * still point at the next unscanned data. + * The scoop contains miss data at scan_buf of length scan_pos. This function + * processes that miss data, returning RS_DONE if it completes, or RS_BLOCKED + * if it gets blocked. After it completes scan_pos is reset to still point at + * the next unscanned data. * * This function uses rs_tube_copy to queue copying from the scoop into output. * and uses rs_tube_catchup to do the copying. This automaticly removes data * from the scoop, but this can block. While rs_tube_catchup is blocked, - * scoop_pos does not point at legit data, so scanning can also not proceed. + * scan_pos does not point at legit data, so scanning can also not proceed. * * In the future this could do compression of miss data before outputing it. */ static inline rs_result rs_processmiss(rs_job_t *job) { - rs_tube_copy(job, job->scoop_pos); - job->scoop_pos = 0; + assert(job->write_len > 0); + rs_tube_copy(job, job->scan_pos); + job->scan_buf += job->scan_pos; + job->scan_len -= job->scan_pos; + job->scan_pos = 0; return rs_tube_catchup(job); } @@ -354,15 +365,14 @@ static inline rs_result rs_processmiss(rs_job_t *job) * recreate the input. */ static rs_result rs_delta_s_slack(rs_job_t *job) { - rs_buffers_t *const stream = job->stream; - size_t avail = stream->avail_in; + size_t avail = rs_scoop_avail(job); if (avail) { rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail); rs_emit_literal_cmd(job, (int)avail); rs_tube_copy(job, avail); return RS_RUNNING; - } else if (rs_job_input_is_ending(job)) { + } else if (rs_scoop_eof(job)) { job->statefn = rs_delta_s_end; return RS_RUNNING; } @@ -30,7 +30,7 @@ #include <time.h> #include "librsync.h" #include "job.h" -#include "stream.h" +#include "scoop.h" #include "trace.h" #include "util.h" @@ -135,11 +135,6 @@ const rs_stats_t *rs_job_statistics(rs_job_t *job) return &job->stats; } -int rs_job_input_is_ending(rs_job_t *job) -{ - return job->stream->eof_in; -} - rs_result rs_job_drive(rs_job_t *job, rs_buffers_t *buf, rs_driven_cb in_cb, void *in_opaque, rs_driven_cb out_cb, void *out_opaque) { @@ -35,6 +35,14 @@ # include "checksum.h" # include "librsync.h" +/** Magic job tag number for checking jobs have been initialized. */ +# define RS_JOB_TAG 20010225 + +/** Max length of a singled delta command is including command bytes. + * + * This is used to constrain and set the internal buffer sizes. */ +# define MAX_DELTA_CMD (1<<16) + /** The contents of this structure are private. */ struct rs_job { int dogtag; @@ -85,13 +93,17 @@ struct rs_job { rs_stats_t stats; /** Buffer of data in the scoop. Allocation is scoop_buf[0..scoop_alloc], - * and scoop_next[0..scoop_avail] contains data yet to be processed. - * scoop_next[scoop_pos..scoop_avail] is the data yet to be scanned. */ - rs_byte_t *scoop_buf; /* the allocation pointer */ - rs_byte_t *scoop_next; /* the data pointer */ - size_t scoop_alloc; /* the allocation size */ - size_t scoop_avail; /* the data size */ - size_t scoop_pos; /* the scan position */ + * and scoop_next[0..scoop_avail] contains data yet to be processed. */ + rs_byte_t *scoop_buf; /**< The buffer allocation pointer. */ + rs_byte_t *scoop_next; /**< The next data pointer. */ + size_t scoop_alloc; /**< The buffer allocation size. */ + size_t scoop_avail; /**< The amount of data available. */ + + /** The delta scan buffer, where scan_buf[scan_pos..scan_len] is the data + * yet to be scanned. */ + rs_byte_t *scan_buf; /**< The delta scan buffer pointer. */ + size_t scan_len; /**< The delta scan buffer length. */ + size_t scan_pos; /**< The delta scan position. */ /** If USED is >0, then buf contains that much write data to be sent out. */ rs_byte_t write_buf[36]; @@ -111,11 +123,6 @@ struct rs_job { rs_job_t *rs_job_new(const char *, rs_result (*statefn)(rs_job_t *)); -int rs_job_input_is_ending(rs_job_t *job); - -/** Magic job tag number for checking jobs have been initialized. */ -# define RS_JOB_TAG 20010225 - /** Assert that a job is valid. * * We don't use a static inline function here so that assert failure output diff --git a/src/librsync.h b/src/librsync.h index 83fb27d..fec8fed 100644 --- a/src/librsync.h +++ b/src/librsync.h @@ -312,11 +312,17 @@ LIBRSYNC_EXPORT void rs_sumset_dump(rs_signature_t const *); * entry, and suitable to be passed in on a second call, but they don't * directly tell you how much output data was produced. * - * Note also that if *#avail_in is nonzero on return, then not all of the input - * data has been consumed. The caller should either provide more output buffer - * space and call ::rs_job_iter() again passing the same #next_in and - * #avail_in, or put the remaining input data into some persistent buffer and - * call rs_job_iter() with it again when there is more output space. + * If the input buffer was large enough, it will be processed directly, + * otherwise the data can be copied and accumulated into an internal buffer for + * processing. This means using larger buffers can be much more efficient. + * + * Note also that if #avail_in is nonzero on return, then not all of the input + * data has been consumed. This can happen either because it ran out of output + * buffer space, or because it processed as much data as possible directly from + * the input buffer and needs more input to proceed without copying into + * internal buffers. The caller should provide more output buffer space and/or + * pack the remaining input data into another buffer with more input before + * calling rs_job_iter() again. * * \sa rs_job_iter() */ struct rs_buffers_s { diff --git a/src/mksum.c b/src/mksum.c index b0fcb2c..5982bb1 100644 --- a/src/mksum.c +++ b/src/mksum.c @@ -31,7 +31,7 @@ #include "librsync.h" #include "job.h" #include "sumset.h" -#include "stream.h" +#include "scoop.h" #include "netint.h" #include "trace.h" #include "util.h" diff --git a/src/netint.c b/src/netint.c index e533b19..7d38071 100644 --- a/src/netint.c +++ b/src/netint.c @@ -30,7 +30,7 @@ #include <assert.h> #include "librsync.h" #include "netint.h" -#include "stream.h" +#include "scoop.h" #define RS_MAX_INT_BYTES 8 diff --git a/src/patch.c b/src/patch.c index 5fae2bb..e288d7d 100644 --- a/src/patch.c +++ b/src/patch.c @@ -33,7 +33,7 @@ #include "librsync.h" #include "job.h" #include "netint.h" -#include "stream.h" +#include "scoop.h" #include "command.h" #include "prototab.h" #include "trace.h" diff --git a/src/readsums.c b/src/readsums.c index dd32416..4f7cd62 100644 --- a/src/readsums.c +++ b/src/readsums.c @@ -26,7 +26,7 @@ #include "librsync.h" #include "job.h" #include "sumset.h" -#include "stream.h" +#include "scoop.h" #include "netint.h" #include "trace.h" #include "util.h" diff --git a/src/scoop.c b/src/scoop.c index d883693..6ac3999 100644 --- a/src/scoop.c +++ b/src/scoop.c @@ -28,23 +28,22 @@ /** \file scoop.c * This file deals with readahead from caller-supplied buffers. * - * Many functions require a certain minimum amount of input to do their - * processing. For example, to calculate a strong checksum of a block we need - * at least a block of input. + * Many functions require a certain minimum amount of contiguous input data to + * do their processing. For example, to calculate a strong checksum of a block + * we need at least a block of input. * * Since we put the buffers completely under the control of the caller, we * can't count on ever getting this much data all in one go. We can't simply * wait, because the caller might have a smaller buffer than we require and so - * we'll never get it. For the same reason we must always accept all the data - * we're given. + * we'll never get it. * - * So, stream input data that's required for readahead is put into a special - * buffer, from which the caller can then read. It's essentially like an - * internal pipe, which on any given read request may or may not be able to - * actually supply the data. - * - * As a future optimization, we might try to take data directly from the input - * buffer if there's already enough there. + * Stream input data is used directly if there is sufficient data to satisfy + * the readhead requests, otherwise it is copied and accumulated into an + * internal buffer until there is enough. This means for large input buffers we + * can leave a "tail" of unprocessed data in the input buffer, and only consume + * all the data if it was too small and start accumulating into the internal + * buffer. Provided the input buffers always have enough data we avoid copying + * into the internal buffer at all. * * \todo We probably know a maximum amount of data that can be scooped up, so * we could just avoid dynamic allocation. However that can't be fixed at @@ -58,12 +57,12 @@ #include <string.h> #include "librsync.h" #include "job.h" -#include "stream.h" +#include "scoop.h" #include "trace.h" #include "util.h" /** Try to accept a from the input buffer to get LEN bytes in the scoop. */ -void rs_scoop_input(rs_job_t *job, size_t len) +static inline void rs_scoop_input(rs_job_t *job, size_t len) { rs_buffers_t *stream = job->stream; size_t tocopy; @@ -84,7 +83,7 @@ void rs_scoop_input(rs_job_t *job, size_t len) rs_trace("resized scoop buffer to " FMT_SIZE " bytes from " FMT_SIZE "", newsize, job->scoop_alloc); job->scoop_alloc = newsize; - } else if (job->scoop_buf != job->scoop_next) { + } else if (job->scoop_buf + job->scoop_alloc < job->scoop_next + len) { /* Move existing data to the front of the scoop. */ rs_trace("moving scoop " FMT_SIZE " bytes to reuse " FMT_SIZE " bytes", job->scoop_avail, (size_t)(job->scoop_next - job->scoop_buf)); @@ -96,7 +95,8 @@ void rs_scoop_input(rs_job_t *job, size_t len) tocopy = len - job->scoop_avail; if (tocopy > stream->avail_in) tocopy = stream->avail_in; - assert(tocopy + job->scoop_avail <= job->scoop_alloc); + assert(job->scoop_next + tocopy + job->scoop_avail <= + job->scoop_buf + job->scoop_alloc); memcpy(job->scoop_next + job->scoop_avail, stream->next_in, tocopy); rs_trace("accepted " FMT_SIZE " bytes from input to scoop", tocopy); @@ -210,20 +210,11 @@ rs_result rs_scoop_read(rs_job_t *job, size_t len, void **ptr) * at EOF, RS_BLOCKED if there was no data and not at EOF. */ rs_result rs_scoop_read_rest(rs_job_t *job, size_t *len, void **ptr) { - rs_buffers_t *stream = job->stream; - - *len = job->scoop_avail + stream->avail_in; + *len = rs_scoop_avail(job); if (*len) return rs_scoop_read(job, *len, ptr); - else if (stream->eof_in) + else if (job->stream->eof_in) return RS_INPUT_ENDED; else return RS_BLOCKED; } - -/** Return the total number of bytes available including the scoop and input - * buffer. */ -size_t rs_scoop_total_avail(rs_job_t *job) -{ - return job->scoop_avail + job->stream->avail_in; -} diff --git a/src/stream.h b/src/scoop.h index be6e286..4d4fde5 100644 --- a/src/stream.h +++ b/src/scoop.h @@ -27,7 +27,7 @@ | And sons who died on the Burma Railway. */ -/** \file stream.h +/** \file scoop.h * Manage librsync streams of IO. * * See \sa scoop.c and \sa tube.c for related code for input and output @@ -52,12 +52,13 @@ * * In buf.c you will find functions that then map buffers onto stdio files. * - * So on return from an encoding function, either the input or the output or - * possibly both will have no more bytes available. + * On return from an encoding function, some input will have been consumed + * and/or some output produced, but either the input or the output or possibly + * both could have some remaining bytes available. * - * librsync never does IO or memory allocation, but relies on the caller. This - * is very nice for integration, but means that we have to be fairly flexible - * as to when we can `read' or `write' stuff internally. + * librsync never does IO or memory allocation for stream input, but relies on + * the caller. This is very nice for integration, but means that we have to be + * fairly flexible as to when we can `read' or `write' stuff internally. * * librsync basically does two types of IO. It reads network integers of * various lengths which encode command and control information such as @@ -73,21 +74,111 @@ * On each call into a stream iterator, it should begin by trying to flush * output. This may well use up all the remaining stream space, in which case * nothing else can be done. */ -#ifndef STREAM_H -# define STREAM_H - -size_t rs_buffers_copy(rs_buffers_t *stream, size_t len); +#ifndef SCOOP_H +# define SCOOP_H +# include <stdbool.h> +# include <stddef.h> +# include "job.h" +# include "librsync.h" rs_result rs_tube_catchup(rs_job_t *job); int rs_tube_is_idle(rs_job_t const *job); void rs_tube_write(rs_job_t *job, void const *buf, size_t len); void rs_tube_copy(rs_job_t *job, size_t len); -void rs_scoop_input(rs_job_t *job, size_t len); void rs_scoop_advance(rs_job_t *job, size_t len); rs_result rs_scoop_readahead(rs_job_t *job, size_t len, void **ptr); rs_result rs_scoop_read(rs_job_t *job, size_t len, void **ptr); rs_result rs_scoop_read_rest(rs_job_t *job, size_t *len, void **ptr); -size_t rs_scoop_total_avail(rs_job_t *job); -#endif /* !STREAM_H */ +static inline size_t rs_scoop_avail(rs_job_t *job) +{ + return job->scoop_avail + job->stream->avail_in; +} + +/** Test if the scoop has reached eof. */ +static inline bool rs_scoop_eof(rs_job_t *job) +{ + return !rs_scoop_avail(job) && job->stream->eof_in; +} + +/** Get a pointer to the next input in the scoop. */ +static inline void *rs_scoop_buf(rs_job_t *job) +{ + return job->scoop_avail ? (void *)job->scoop_next : (void *)job-> + stream->next_in; +} + +/** Get the contiguous length of the next input in the scoop. */ +static inline size_t rs_scoop_len(rs_job_t *job) +{ + return job->scoop_avail ? job->scoop_avail : job->stream->avail_in; +} + +/** Get the next contiguous buffer of data available in the scoop. + * + * This will return a pointer to the data and reduce len to the amount of + * contiguous data available at that position. + * + * \param *job - the job instance to use. + * + * \param *len - the amount of data desired, updated to the amount available. + * + * \return A pointer to the data. */ +static inline void *rs_scoop_getbuf(rs_job_t *job, size_t *len) +{ + size_t max_len = rs_scoop_len(job); + if (*len > max_len) + *len = max_len; + return rs_scoop_buf(job); +} + +/** Iterate through and consume contiguous data buffers in the scoop. + * + * Example: \code + * size_t len=rs_scoop_avail(job); + * size_t ilen; + * + * for (buf = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0; + * buf = rs_scoop_nextbuf(job, &len, &ilen)) + * ilen = fwrite(buf, ilen, 1, f); + * \endcode + * + * At each iteration buf and ilen are the data and its length for the current + * iteration, and len is the remaining data to iterate through including the + * current iteration. During an iteration you can change ilen to indicate only + * part of the buffer was processed and the next iteration will take this into + * account. Setting ilen = 0 to indicate blocking or errors will terminate + * iteration. + * + * At the end of iteration buf will point at the next location in the scoop + * after the iterated data, len and ilen will be zero, or the remaining data + * and last ilen if iteration was terminated by setting ilen = 0. + * + * \param *job - the job instance to use. + * + * \param *len - the size_t amount of data to iterate over. + * + * \param *ilen - the size_t amount of data in the current iteration. + * + * \return A pointer to data in the current iteration. */ +static inline void *rs_scoop_iterbuf(rs_job_t *job, size_t *len, size_t *ilen) +{ + *ilen = *len; + return rs_scoop_getbuf(job, ilen); +} + +/** Get the next iteration of contiguous data buffers from the scoop. + * + * This advances the scoop for the previous iteration, and gets the next + * iteration. \sa rs_scoop_iterbuf */ +static inline void *rs_scoop_nextbuf(rs_job_t *job, size_t *len, size_t *ilen) +{ + if (*ilen == 0) + return rs_scoop_buf(job); + rs_scoop_advance(job, *ilen); + *len -= *ilen; + return rs_scoop_iterbuf(job, len, ilen); +} + +#endif /* !SCOOP_H */ @@ -56,7 +56,7 @@ #include <string.h> #include "librsync.h" #include "job.h" -#include "stream.h" +#include "scoop.h" #include "trace.h" static void rs_tube_catchup_write(rs_job_t *job) @@ -80,75 +80,36 @@ static void rs_tube_catchup_write(rs_job_t *job) len, job->write_len); } -/** Execute a copy command, taking data from the scoop. - * - * \sa rs_tube_catchup_copy() */ -static void rs_tube_copy_from_scoop(rs_job_t *job) -{ - rs_buffers_t *stream = job->stream; - size_t len = job->copy_len; - - assert(len > 0); - if (len > job->scoop_avail) - len = job->scoop_avail; - if (len > stream->avail_out) - len = stream->avail_out; - if (len) { - memcpy(stream->next_out, job->scoop_next, len); - stream->next_out += len; - stream->avail_out -= len; - job->scoop_avail -= len; - job->scoop_next += len; - job->copy_len -= len; - } - rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE - " left in scoop, " FMT_SIZE " left to copy", len, job->scoop_avail, - job->copy_len); -} - -/** Execute a copy command, taking data from the stream. - * - * \sa rs_tube_catchup_copy() */ -static void rs_tube_copy_from_stream(rs_job_t *job) -{ - rs_buffers_t *stream = job->stream; - size_t len = job->copy_len; - - assert(len > 0); - if (len > stream->avail_in) - len = stream->avail_in; - if (len > stream->avail_out) - len = stream->avail_out; - if (len) { - memcpy(stream->next_out, stream->next_in, len); - stream->next_out += len; - stream->avail_out -= len; - stream->next_in += len; - stream->avail_in -= len; - job->copy_len -= len; - } - rs_trace("copied " FMT_SIZE " bytes from stream, " FMT_SIZE - "left in stream, " FMT_SIZE " left to copy", len, stream->avail_in, - job->copy_len); -} - /** Catch up on an outstanding copy command. * - * Takes data from the scoop, and the input (in that order), and writes as much - * as will fit to the output, up to the limit of the outstanding copy. */ + * Takes data from the scoop and writes as much as will fit to the output, up + * to the limit of the outstanding copy. */ static void rs_tube_catchup_copy(rs_job_t *job) { assert(job->write_len == 0); assert(job->copy_len > 0); - - /* If there's data in the scoop, send that first. */ - if (job->scoop_avail && job->copy_len) { - rs_tube_copy_from_scoop(job); - } - /* If there's more to copy and we emptied the scoop, send input. */ - if (job->copy_len && !job->scoop_avail) { - rs_tube_copy_from_stream(job); + rs_buffers_t *stream = job->stream; + size_t copy_len = job->copy_len; + size_t avail_in = rs_scoop_avail(job); + size_t avail_out = stream->avail_out; + size_t len, ilen; + void *next; + + if (copy_len > avail_in) + copy_len = avail_in; + if (copy_len > avail_out) + copy_len = avail_out; + len = copy_len; + for (next = rs_scoop_iterbuf(job, &len, &ilen); ilen > 0; + next = rs_scoop_nextbuf(job, &len, &ilen)) { + memcpy(stream->next_out, next, ilen); + stream->next_out += ilen; + stream->avail_out -= ilen; + job->copy_len -= ilen; } + rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE + " left in scoop, " FMT_SIZE " left to copy", copy_len, + rs_scoop_avail(job), job->copy_len); } /** Put whatever will fit from the tube into the output of the stream. @@ -166,8 +127,7 @@ rs_result rs_tube_catchup(rs_job_t *job) if (job->copy_len) { rs_tube_catchup_copy(job); if (job->copy_len) { - if (job->stream->eof_in && !job->stream->avail_in - && !job->scoop_avail) { + if (rs_scoop_eof(job)) { rs_error("reached end of file while copying data"); return RS_INPUT_ENDED; } diff --git a/src/whole.c b/src/whole.c index 04777f8..e667f88 100644 --- a/src/whole.c +++ b/src/whole.c @@ -111,9 +111,9 @@ rs_result rs_delta_file(rs_signature_t *sig, FILE *new_file, FILE *delta_file, rs_result r; job = rs_delta_begin(sig); - /* Size inbuf for 1 block, outbuf for literal cmd + 4 blocks. */ - r = rs_whole_run(job, new_file, delta_file, sig->block_len, - 10 + 4 * sig->block_len); + /* Size inbuf for 4*(CMD + 1 block), outbuf for 4*CMD. */ + r = rs_whole_run(job, new_file, delta_file, + 4 * (MAX_DELTA_CMD + sig->block_len), 4 * MAX_DELTA_CMD); if (stats) memcpy(stats, &job->stats, sizeof *stats); rs_job_free(job); @@ -127,8 +127,9 @@ rs_result rs_patch_file(FILE *basis_file, FILE *delta_file, FILE *new_file, rs_result r; job = rs_patch_begin(rs_file_copy_cb, basis_file); - /* Default size inbuf and outbuf 64K. */ - r = rs_whole_run(job, delta_file, new_file, 64 * 1024, 64 * 1024); + /* Default size inbuf 1*CMD and outbuf 4*CMD. */ + r = rs_whole_run(job, delta_file, new_file, MAX_DELTA_CMD, + 4 * MAX_DELTA_CMD); if (stats) memcpy(stats, &job->stats, sizeof *stats); rs_job_free(job); |