summaryrefslogtreecommitdiff
path: root/delta.c
diff options
context:
space:
mode:
Diffstat (limited to 'delta.c')
-rw-r--r--delta.c532
1 files changed, 267 insertions, 265 deletions
diff --git a/delta.c b/delta.c
index 85a0826..611cf2a 100644
--- a/delta.c
+++ b/delta.c
@@ -75,316 +75,316 @@
#include "sumset.h"
#include "job.h"
#include "trace.h"
-#include "checksum.h"
#include "search.h"
+#include "types.h"
+#include "rollsum.h"
/**
- * Turn this on to make all rolling checksums be checked from scratch.
+ * 2002-06-26: Donovan Baarda
+ *
+ * The following is based entirely on pysync. It is much cleaner than the
+ * previous incarnation of this code. It is slightly complicated because in
+ * this case the output can block, so the main delta loop needs to stop
+ * when this happens.
+ *
+ * In pysync a 'last' attribute is used to hold the last miss or match for
+ * extending if possible. In this code, basis_len and scoop_pos are used
+ * instead of 'last'. When basis_len > 0, last is a match. When basis_len =
+ * 0 and scoop_pos is > 0, last is a miss. When both are 0, last is None
+ * (ie, nothing).
+ *
+ * Pysync is also slightly different in that a 'flush' method is available
+ * to force output of accumulated data. This 'flush' is use to finalise
+ * delta calculation. In librsync input is terminated with an eof flag on
+ * the input stream. I have structured this code similar to pysync with a
+ * seperate flush function that is used when eof is reached. This allows
+ * for a flush style API if one is ever needed. Note that flush in pysync
+ * can be used for more than just terminating delta calculation, so a flush
+ * based API can in some ways be more flexible...
+ *
+ * The input data is first scanned, then processed. Scanning identifies
+ * input data as misses or matches, and emits the instruction stream.
+ * Processing the data consumes it off the input scoop and outputs the
+ * processed miss data into the tube.
+ *
+ * The scoop contains all data yet to be processed. The scoop_pos is an
+ * index into the scoop that indicates the point scanned to. As data is
+ * scanned, scoop_pos is incremented. As data is processed, it is removed
+ * from the scoop and scoop_pos adjusted. Everything gets complicated
+ * because the tube can block. When the tube is blocked, no data can be
+ * processed.
+ *
*/
-int rs_roll_paranoia = 0;
-
-
-static rs_result rs_delta_scan(rs_job_t *, rs_long_t avail_len, void *);
-static rs_result rs_delta_match(rs_job_t *, rs_long_t avail_len, void *);
-
-static rs_result rs_delta_s_deferred_advance(rs_job_t *job);
+/* used by rdiff, but now redundant */
+int rs_roll_paranoia = 0;
-
-static rs_result rs_delta_s_end(rs_job_t *job)
-{
- rs_emit_end_cmd(job);
- return RS_DONE;
-}
-
+static rs_result rs_delta_s_scan(rs_job_t *job);
+static rs_result rs_delta_s_flush(rs_job_t *job);
+static rs_result rs_delta_s_end(rs_job_t *job);
+void rs_getinput(rs_job_t *job);
+inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len);
+inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len);
+inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
+inline rs_result rs_appendflush(rs_job_t *job);
+inline rs_result rs_processmatch(rs_job_t *job);
+inline rs_result rs_processmiss(rs_job_t *job);
/**
* \brief Get a block of data if possible, and see if it matches.
- *
- * On each call, we try to process all of the input data available on
- * the scoop and input buffer.
- */
-static rs_result
-rs_delta_s_scan(rs_job_t *job)
+ *
+ * On each call, we try to process all of the input data available on the
+ * scoop and input buffer. */
+static rs_result rs_delta_s_scan(rs_job_t *job)
{
- size_t this_len, avail_len;
- int is_ending;
- void *inptr;
+ rs_long_t match_pos;
+ size_t match_len;
rs_result result;
+ Rollsum test;
rs_job_check(job);
-
- avail_len = rs_scoop_total_avail(job);
- this_len = job->block_len;
- is_ending = job->stream->eof_in;
-
- /* Now, we have avail_len bytes, and we need to scan through them
- * looking for a match. We may end up emitting a bunch of
- * commands depending on how the blocks match with the signature */
- if ((avail_len == 0) && (job->basis_len == 0)) {
- if (is_ending) {
- /* no more delta to do */
- job->statefn = rs_delta_s_end;
+ /* read the input into the scoop */
+ rs_getinput(job);
+ /* output any pending output from the tube */
+ result=rs_tube_catchup(job);
+ /* while output is not blocked and there is a block of data */
+ while ((result==RS_DONE) &&
+ ((job->scoop_pos + job->block_len) < job->scoop_avail)) {
+ /* check if this block matches */
+ if (rs_findmatch(job,&match_pos,&match_len)) {
+ /* append the match and reset the weak_sum */
+ result=rs_appendmatch(job,match_pos,match_len);
+ RollsumInit(&job->weak_sum);
+ } else {
+ /* rotate the weak_sum and append the miss byte */
+ RollsumRotate(&job->weak_sum,job->scoop_next[job->scoop_pos],
+ job->scoop_next[job->scoop_pos+job->block_len]);
+ result=rs_appendmiss(job,1);
+ if (rs_roll_paranoia) {
+ RollsumInit(&test);
+ RollsumUpdate(&test,job->scoop_next+job->scoop_pos,
+ job->block_len);
+ if (RollsumDigest(&test) != RollsumDigest(&job->weak_sum)) {
+ rs_fatal("mismatch between rolled sum %#x and check %#x",
+ (int)RollsumDigest(&job->weak_sum),
+ (int)RollsumDigest(&test));
+ }
+
+ }
}
- return RS_BLOCKED;
- }
-
- /* must read at least one block, or give up */
- if ((avail_len < job->block_len) && !is_ending) {
- /* we know we won't get it, but we have to try for a whole
- * block anyhow so that it gets into the scoop. */
- rs_scoop_input(job, job->block_len);
- return RS_BLOCKED;
}
-
- result = rs_scoop_readahead(job, avail_len, &inptr);
- if (result != RS_DONE)
- return result;
-
- if (!job->basis_len)
- return rs_delta_scan(job, avail_len, inptr);
- else
- return rs_delta_match(job, avail_len, inptr);
+ /* if we completed OK */
+ if (result==RS_DONE) {
+ /* if we reached eof, we can flush the last fragment */
+ if (job->stream->eof_in) {
+ job->statefn=rs_delta_s_flush;
+ return RS_RUNNING;
+ } else {
+ /* we are blocked waiting for more data */
+ return RS_BLOCKED;
+ }
+ }
+ return result;
}
-/**
- * Scan for a matching block in the next \p avail_len bytes of input.
- *
- * If nonmatching data is found, then a LITERAL command will be put in
- * the tube immediately. If matching data is found, then its position
- * will be saved in the job, and the job state set up to to perform
- * RLL encoding after handling the literal.
- */
-static rs_result
-rs_delta_scan(rs_job_t *job, rs_long_t avail_len, void *p)
+static rs_result rs_delta_s_flush(rs_job_t *job)
{
- rs_long_t match_where;
- int search_pos, end_pos;
- unsigned char *inptr = (unsigned char *) p;
- unsigned s1 = job->weak_sig & 0xFFFF;
- unsigned s2 = job->weak_sig >> 16;
+ rs_long_t match_pos;
+ size_t match_len;
+ rs_result result;
- if (job->basis_len) {
- rs_log(RS_LOG_ERR, "somehow got nonzero basis_len");
- return RS_INTERNAL_ERROR;
+ rs_job_check(job);
+ /* read the input into the scoop */
+ rs_getinput(job);
+ /* output any pending output */
+ result=rs_tube_catchup(job);
+ /* while output is not blocked and there is any remaining data */
+ while ((result==RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
+ /* check if this block matches */
+ if (rs_findmatch(job,&match_pos,&match_len)) {
+ /* append the match and reset the weak_sum */
+ result=rs_appendmatch(job,match_pos,match_len);
+ RollsumInit(&job->weak_sum);
+ } else {
+ /* rollout from weak_sum and append the miss byte */
+ RollsumRollout(&job->weak_sum,job->scoop_next[job->scoop_pos]);
+ rs_trace("block reduced to %d", (int)job->weak_sum.count);
+ result=rs_appendmiss(job,1);
+ }
+ }
+ /* if we are not blocked, flush and set end statefn. */
+ if (result==RS_DONE) {
+ result=rs_appendflush(job);
+ job->statefn=rs_delta_s_end;
}
+ if (result==RS_DONE) {
+ return RS_RUNNING;
+ }
+ return result;
+}
-
- /* So, we have avail_len bytes of data, and we want to look
- * through it for a match at some point. It's OK if it's not at
- * the start of the available input data. If we're approaching
- * the end and can't get a match, then we just block and get more
- * later. */
-
- /* FIXME: Perhaps we should be working in signed chars for the
- * rolling sum? */
-
- if (job->stream->eof_in)
- end_pos = avail_len - 1;
- else
- end_pos = avail_len - job->block_len;
-
- for (search_pos = 0; search_pos <= end_pos; search_pos++) {
- size_t this_len = job->block_len;
-
- /* Did we inherit the signature from rs_delta_match, if
- * so we know this block won't match and we should simply
- * skip the first char.
- */
- if (job->have_weak_sig < 0) {
- /* advance by one; roll out the byte we just skipped over. */
- unsigned char a = inptr[search_pos];
- unsigned shift = a + RS_CHAR_OFFSET;
-
- s1 -= shift;
- s2 -= this_len * shift;
- job->weak_sig = (s1 & 0xffff) | (s2 << 16);
- job->have_weak_sig = 1;
- /* We already know that this block won't match!*/
- continue;
- }
- if (search_pos + this_len > avail_len) {
- this_len = avail_len - search_pos;
- rs_trace("block reduced to %d", this_len);
- } else if (job->have_weak_sig > 0) {
- unsigned char a = inptr[search_pos + this_len - 1];
- /* roll in the newly added byte, if any */
- s1 += a + RS_CHAR_OFFSET;
- s2 += s1;
+static rs_result rs_delta_s_end(rs_job_t *job)
+{
+ rs_emit_end_cmd(job);
+ return RS_DONE;
+}
- job->weak_sig = (s1 & 0xffff) | (s2 << 16);
- }
- if (!job->have_weak_sig) {
- rs_trace("calculate weak sum from scratch");
- job->weak_sig = rs_calc_weak_sum(inptr + search_pos, this_len);
- s1 = job->weak_sig & 0xFFFF;
- s2 = job->weak_sig >> 16;
- job->have_weak_sig = 1;
- }
+void rs_getinput(rs_job_t *job) {
+ size_t len;
+
+ len=rs_scoop_total_avail(job);
+ if (job->scoop_avail < len) {
+ rs_scoop_input(job,len);
+ }
+}
- if (rs_roll_paranoia) {
- rs_weak_sum_t verify = rs_calc_weak_sum(inptr + search_pos, this_len);
- if (verify != job->weak_sig) {
- rs_fatal("mismatch between rolled sum %#x and check %#x",
- job->weak_sig, verify);
- }
- }
- if (rs_search_for_block(job->weak_sig, inptr + search_pos, this_len,
- job->signature, &job->stats, &match_where)) {
- /* So, we got a match. Cool. However, there may be
- * leading unmatched data that we need to flush. Thus we
- * set our statefn to be rs_delta_s_deferred_advance so that
- * we can skip bytes and write out the copy command later. */
-
- rs_trace("matched %.0f bytes at %.0f!",
- (double) this_len, (double) match_where);
- job->basis_pos = match_where;
- job->basis_len = this_len;
- job->statefn = rs_delta_s_deferred_advance;
- job->have_weak_sig = 0;
- break;
- } else {
- /* advance by one; roll out the byte we just moved over. */
- unsigned char a = inptr[search_pos];
- unsigned shift = a + RS_CHAR_OFFSET;
-
- s1 -= shift;
- s2 -= this_len * shift;
- job->weak_sig = (s1 & 0xffff) | (s2 << 16);
+/**
+ * find a match at scoop_pos, returning the match_pos and match_len.
+ * Note that this will calculate weak_sum if required. It will also
+ * determine the match_len.
+ *
+ * Note that this routine could be modified to do xdelta style matches that
+ * would extend matches past block boundaries by matching backwards and
+ * forwards beyond the block boundaries. Extending backwards would require
+ * decrementing scoop_pos as appropriate.
+ */
+inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len) {
+ /* calculate the weak_sum if we don't have one */
+ if (job->weak_sum.count == 0) {
+ /* set match_len to min(block_len, scan_avail) */
+ *match_len=job->scoop_avail - job->scoop_pos;
+ if (*match_len > job->block_len) {
+ *match_len = job->block_len;
}
+ /* Update the weak_sum */
+ RollsumUpdate(&job->weak_sum,job->scoop_next+job->scoop_pos,*match_len);
+ rs_trace("calculate weak sum from scratch length %d",(int)job->weak_sum.count);
+ } else {
+ /* set the match_len to the weak_sum count */
+ *match_len=job->weak_sum.count;
}
-
- if (search_pos > 0) {
- /* We may or may not have found a block, but we know we found
- * some literal data at the start of the buffer. Therefore,
- * we have to flush that out before we can continue on and
- * emit the copy command or keep searching. */
-
- /* FIXME: At the moment, if you call with very short buffers,
- * then you will get a series of very short LITERAL commands.
- * Perhaps this is what you deserve, or perhaps we should try
- * to get more readahead and avoid that. */
-
- /* There's some literal data at the start of this window which
- * we know is not in any block. */
- rs_trace("got %d bytes of literal data", search_pos);
- rs_emit_literal_cmd(job, search_pos);
- rs_tube_copy(job, search_pos);
- }
-
- return RS_RUNNING;
+ return rs_search_for_block(RollsumDigest(&job->weak_sum),
+ job->scoop_next+job->scoop_pos,
+ *match_len,
+ job->signature,
+ &job->stats,
+ match_pos);
}
+
/**
- * advance the scoop pointer to skip a matched block.
- *
- * We can't do this greedily within rs_delta_scan since rs_tube_copy is lazy.
- * Instead we use this intermediate state to advance the scoop.
- */
-static rs_result
-rs_delta_s_deferred_advance(rs_job_t *job)
+ * Append a match at match_pos of length match_len to the delta, extending
+ * a previous match if possible, or flushing any previous miss/match. */
+inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
{
- if (!job->basis_len) {
- rs_log(RS_LOG_ERR, "somehow got zero basis_len");
- return RS_INTERNAL_ERROR;
+ rs_result result=RS_DONE;
+
+ /* if last was a match that can be extended, extend it */
+ if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
+ job->basis_len+=match_len;
+ } else {
+ /* else appendflush the last value */
+ result=rs_appendflush(job);
+ /* make this the new match value */
+ job->basis_pos=match_pos;
+ job->basis_len=match_len;
}
-
- rs_scoop_advance(job,job->basis_len);
- job->statefn=rs_delta_s_scan;
-
- return RS_RUNNING;
+ /* increment scoop_pos to point at next unscanned data */
+ job->scoop_pos+=match_len;
+ /* we can only process from the scoop if output is not blocked */
+ if (result==RS_DONE) {
+ /* process the match data off the scoop*/
+ result=rs_processmatch(job);
+ }
+ return result;
}
+
/**
- * Do RLL coding of output.
- *
- * When a matched block is found we are in this state. We try to accumulate
- * adjacent blocks for RLL encoding of the output. If a non-adjacent block is
- * matched, we emit a copy command for the accumulated blocks and start a
- * new RLL sequence. If a block can't be matched we need to rescan.
- */
-static rs_result
-rs_delta_match(rs_job_t *job, rs_long_t avail_len, void *p)
+ * Append a miss of length miss_len to the delta, extending a previous miss
+ * if possible, or flushing any previous match.
+ *
+ * This also breaks misses up into block_len segments to avoid accumulating
+ * too much in memory. */
+inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
{
- rs_long_t match_where;
- int search_pos;
- unsigned char *inptr = (unsigned char *) p;
- int ending= job->stream->eof_in;
-
- if (!job->basis_len) {
- rs_log(RS_LOG_ERR, "somehow got zero basis_len");
- return RS_INTERNAL_ERROR;
+ rs_result result=RS_DONE;
+
+ /* if last was a match, or block_len misses, appendflush it */
+ if (job->basis_len || (job->scoop_pos >= rs_outbuflen)) {
+ result=rs_appendflush(job);
}
+ /* increment scoop_pos */
+ job->scoop_pos+=miss_len;
+ return result;
+}
- /* So, we have avail_len bytes of data, and we previously matched
- * one or more blocks. We now look for adjacent matches to roll into the
- * the current match. If we hit a block that has no match, we need to
- * go back rs_delta_scan and rescan. */
-
- for (search_pos = 0; search_pos <= avail_len; search_pos+=job->block_len) {
- size_t this_len = job->block_len;
-
- if (search_pos + this_len > avail_len) {
- /* We only allow short blocks at the end of stream*/
- if (!ending) {
- rs_trace("waiting for more input");
- return RS_BLOCKED;
- }
- this_len = avail_len - search_pos;
- rs_trace("block reduced to %d", this_len);
- }
-
- rs_trace("calculate weak sum from scratch");
- job->weak_sig = rs_calc_weak_sum(inptr + search_pos, this_len);
- job->have_weak_sig = -1;
-
- if (rs_search_for_block(job->weak_sig, inptr + search_pos, this_len,
- job->signature, &job->stats, &match_where)) {
- /* So, we got a match. Cool. Now try to roll it into the previous
- * match. If we can't we start a new rll sequence. */
- rs_trace("matched %.0f bytes at %.0f!",
- (double) this_len, (double) match_where);
- /* At this point we have matched this block so skip it*/
- /* We do this now since we might return in the IF block*/
- rs_scoop_advance(job,this_len);
-
- if (match_where == (job->basis_pos + job->basis_len)) {
- job->basis_len += this_len;
- rs_trace("adjacent match: accumulated %.0f bytes at %.0f",
- (double)job->basis_len,(double)job->basis_pos);
- } else {
- rs_trace("new match, flushing %.0f bytes at %.0f",
- (double)job->basis_pos,(double)job->basis_len);
- rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
- job->basis_pos = match_where;
- job->basis_len = this_len;
- /* Give the tube a chance to catchup */
- return RS_RUNNING;
- }
- } else {
- /* Copy blocks that we acummulated, there should be at least one */
- rs_trace("no match, copying %.0f bytes at %.0f",
- (double)job->basis_len,(double)job->basis_pos);
- rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
- /* Unmatched data...we need to rescan*/
- job->basis_len=0;
- return RS_RUNNING;
- }
- }
-
- if (ending) {
- /* The job ended with a matching block..we must copy everything*/
+/**
+ * Flush any accumulating hit or miss, appending it to the delta.
+ */
+inline rs_result rs_appendflush(rs_job_t *job)
+{
+ /* if last is a match, emit it and reset last by resetting basis_len */
+ if (job->basis_len) {
+ rs_trace("matched %.0f bytes at %.0f!",
+ (double) job->basis_len, (double) job->basis_pos);
rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
job->basis_len=0;
+ return rs_processmatch(job);
+ /* else if last is a miss, emit and process it*/
+ } else if (job->scoop_pos) {
+ rs_trace("got %d bytes of literal data", job->scoop_pos);
+ rs_emit_literal_cmd(job, job->scoop_pos);
+ return rs_processmiss(job);
}
+ /* otherwise, nothing to flush so we are done */
+ return RS_DONE;
+}
- return RS_RUNNING;
+
+/**
+ * The scoop contains match data at scoop_next of length scoop_pos. This
+ * function processes that match data, returning RS_DONE if it completes,
+ * or RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset
+ * to still point at the next unscanned data.
+ *
+ * This function currently just removes data from the scoop and adjusts
+ * scoop_pos appropriately. In the future this could be used for something
+ * like context compressing of miss data. Note that it also calls
+ * rs_tube_catchup to output any pending output. */
+inline rs_result rs_processmatch(rs_job_t *job)
+{
+ job->scoop_avail-=job->scoop_pos;
+ job->scoop_next+=job->scoop_pos;
+ job->scoop_pos=0;
+ return rs_tube_catchup(job);
+}
+
+/**
+ * The scoop contains miss data at scoop_next of length scoop_pos. This
+ * function processes that miss data, returning RS_DONE if it completes, or
+ * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
+ * still point at the next unscanned data.
+ *
+ * This function uses rs_tube_copy to queue copying from the scoop into
+ * output. and uses rs_tube_catchup to do the copying. This automaticly
+ * removes data from the scoop, but this can block. While rs_tube_catchup
+ * is blocked, scoop_pos does not point at legit data, so scanning can also
+ * not proceed.
+ *
+ * In the future this could do compression of miss data before outputing
+ * it. */
+inline rs_result rs_processmiss(rs_job_t *job)
+{
+ rs_tube_copy(job, job->scoop_pos);
+ job->scoop_pos=0;
+ return rs_tube_catchup(job);
}
@@ -445,7 +445,9 @@ rs_job_t *rs_delta_begin(rs_signature_t *sig)
job = rs_job_new("delta", rs_delta_s_header);
job->signature = sig;
-
+
+ RollsumInit(&job->weak_sum);
+
if ((job->block_len = sig->block_len) < 0) {
rs_log(RS_LOG_ERR, "unreasonable block_len %d in signature",
job->block_len);