librsync  2.0.2
delta.c
Go to the documentation of this file.
1 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  *
3  * librsync -- library for network deltas
4  *
5  * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6  * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation; either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22 
23  /*=
24  | Let's climb to the TOP of that
25  | MOUNTAIN and think about STRIP
26  | MINING!!
27  */
28 
29 /** \file delta.c -- Generate in streaming mode an rsync delta given a set of
30  * signatures, and a new file.
31  *
32  * The size of blocks for signature generation is determined by the block size
33  * in the incoming signature.
34  *
35  * To calculate a signature, we need to be able to see at least one block of
36  * the new file at a time. Once we have that, we calculate its weak signature,
37  * and see if there is any block in the signature hash table that has the same
38  * weak sum. If there is one, then we also compute the strong sum of the new
39  * block, and cross check that. If they're the same, then we can assume we have
40  * a match.
41  *
42  * The final block of the file has to be handled a little differently, because
43  * it may be a short match. Short blocks in the signature don't include their
44  * length -- we just allow for the final short block of the file to match any
45  * block in the signature, and if they have the same checksum we assume they
46  * must have the same length. Therefore, when we emit a COPY command, we have
47  * to send it with a length that is the same as the block matched, and not the
48  * block length from the signature.
49  *
50  * Profiling results as of v1.26, 2001-03-18:
51  *
52  * If everything matches, then we spend almost all our time in rs_mdfour64 and
53  * rs_weak_sum, which is unavoidable and therefore a good profile.
54  *
55  * If nothing matches, it is not so good.
56  *
57  * 2002-06-26: Donovan Baarda
58  *
59  * The following is based entirely on pysync. It is much cleaner than the
60  * previous incarnation of this code. It is slightly complicated because in
61  * this case the output can block, so the main delta loop needs to stop when
62  * this happens.
63  *
64  * In pysync a 'last' attribute is used to hold the last miss or match for
65  * extending if possible. In this code, basis_len and scoop_pos are used
66  * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0
67  * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie,
68  * nothing).
69  *
70  * Pysync is also slightly different in that a 'flush' method is available to
71  * force output of accumulated data. This 'flush' is use to finalise delta
72  * calculation. In librsync input is terminated with an eof flag on the input
73  * stream. I have structured this code similar to pysync with a seperate flush
74  * function that is used when eof is reached. This allows for a flush style API
75  * if one is ever needed. Note that flush in pysync can be used for more than
76  * just terminating delta calculation, so a flush based API can in some ways be
77  * more flexible...
78  *
79  * The input data is first scanned, then processed. Scanning identifies input
80  * data as misses or matches, and emits the instruction stream. Processing the
81  * data consumes it off the input scoop and outputs the processed miss data
82  * into the tube.
83  *
84  * The scoop contains all data yet to be processed. The scoop_pos is an index
85  * into the scoop that indicates the point scanned to. As data is scanned,
86  * scoop_pos is incremented. As data is processed, it is removed from the scoop
87  * and scoop_pos adjusted. Everything gets complicated because the tube can
88  * block. When the tube is blocked, no data can be processed. */
89 
90 #include "config.h"
91 
92 #include <assert.h>
93 #include <stdlib.h>
94 #include <stdio.h>
95 
96 #include "librsync.h"
97 #include "emit.h"
98 #include "stream.h"
99 #include "util.h"
100 #include "sumset.h"
101 #include "job.h"
102 #include "trace.h"
103 #include "rollsum.h"
104 
105 /* used by rdiff, but now redundant */
106 int rs_roll_paranoia = 0;
107 
108 static rs_result rs_delta_s_scan(rs_job_t *job);
109 static rs_result rs_delta_s_flush(rs_job_t *job);
110 static rs_result rs_delta_s_end(rs_job_t *job);
111 static inline void rs_getinput(rs_job_t *job);
112 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
113  size_t *match_len);
114 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
115  size_t match_len);
116 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
117 static inline rs_result rs_appendflush(rs_job_t *job);
118 static inline rs_result rs_processmatch(rs_job_t *job);
119 static inline rs_result rs_processmiss(rs_job_t *job);
120 
121 /** Get a block of data if possible, and see if it matches.
122  *
123  * On each call, we try to process all of the input data available on the scoop
124  * and input buffer. */
126 {
127  const size_t block_len = job->signature->block_len;
128  rs_long_t match_pos;
129  size_t match_len;
130  rs_result result;
131  Rollsum test;
132 
133  rs_job_check(job);
134  /* read the input into the scoop */
135  rs_getinput(job);
136  /* output any pending output from the tube */
137  result = rs_tube_catchup(job);
138  /* while output is not blocked and there is a block of data */
139  while ((result == RS_DONE)
140  && ((job->scoop_pos + block_len) < job->scoop_avail)) {
141  /* check if this block matches */
142  if (rs_findmatch(job, &match_pos, &match_len)) {
143  /* append the match and reset the weak_sum */
144  result = rs_appendmatch(job, match_pos, match_len);
145  RollsumInit(&job->weak_sum);
146  } else {
147  /* rotate the weak_sum and append the miss byte */
148  RollsumRotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
149  job->scoop_next[job->scoop_pos + block_len]);
150  result = rs_appendmiss(job, 1);
151  if (rs_roll_paranoia) {
152  RollsumInit(&test);
153  RollsumUpdate(&test, job->scoop_next + job->scoop_pos,
154  block_len);
155  if (RollsumDigest(&test) != RollsumDigest(&job->weak_sum)) {
156  rs_fatal("mismatch between rolled sum " FMT_WEAKSUM
157  " and check " FMT_WEAKSUM "",
158  RollsumDigest(&job->weak_sum),
159  RollsumDigest(&test));
160  }
161 
162  }
163  }
164  }
165  /* if we completed OK */
166  if (result == RS_DONE) {
167  /* if we reached eof, we can flush the last fragment */
168  if (job->stream->eof_in) {
169  job->statefn = rs_delta_s_flush;
170  return RS_RUNNING;
171  } else {
172  /* we are blocked waiting for more data */
173  return RS_BLOCKED;
174  }
175  }
176  return result;
177 }
178 
179 static rs_result rs_delta_s_flush(rs_job_t *job)
180 {
181  rs_long_t match_pos;
182  size_t match_len;
183  rs_result result;
184 
185  rs_job_check(job);
186  /* read the input into the scoop */
187  rs_getinput(job);
188  /* output any pending output */
189  result = rs_tube_catchup(job);
190  /* while output is not blocked and there is any remaining data */
191  while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
192  /* check if this block matches */
193  if (rs_findmatch(job, &match_pos, &match_len)) {
194  /* append the match and reset the weak_sum */
195  result = rs_appendmatch(job, match_pos, match_len);
196  RollsumInit(&job->weak_sum);
197  } else {
198  /* rollout from weak_sum and append the miss byte */
199  RollsumRollout(&job->weak_sum, job->scoop_next[job->scoop_pos]);
200  rs_trace("block reduced to " FMT_SIZE "", job->weak_sum.count);
201  result = rs_appendmiss(job, 1);
202  }
203  }
204  /* if we are not blocked, flush and set end statefn. */
205  if (result == RS_DONE) {
206  result = rs_appendflush(job);
207  job->statefn = rs_delta_s_end;
208  }
209  if (result == RS_DONE) {
210  return RS_RUNNING;
211  }
212  return result;
213 }
214 
215 static rs_result rs_delta_s_end(rs_job_t *job)
216 {
217  rs_emit_end_cmd(job);
218  return RS_DONE;
219 }
220 
221 static inline void rs_getinput(rs_job_t *job)
222 {
223  size_t len;
224 
225  len = rs_scoop_total_avail(job);
226  if (job->scoop_avail < len) {
227  rs_scoop_input(job, len);
228  }
229 }
230 
231 /** find a match at scoop_pos, returning the match_pos and match_len.
232  *
233  * Note that this will calculate weak_sum if required. It will also determine
234  * the match_len.
235  *
236  * This routine could be modified to do xdelta style matches that would extend
237  * matches past block boundaries by matching backwards and forwards beyond the
238  * block boundaries. Extending backwards would require decrementing scoop_pos
239  * as appropriate. */
240 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
241  size_t *match_len)
242 {
243  const size_t block_len = job->signature->block_len;
244 
245  /* calculate the weak_sum if we don't have one */
246  if (job->weak_sum.count == 0) {
247  /* set match_len to min(block_len, scan_avail) */
248  *match_len = job->scoop_avail - job->scoop_pos;
249  if (*match_len > block_len) {
250  *match_len = block_len;
251  }
252  /* Update the weak_sum */
253  RollsumUpdate(&job->weak_sum, job->scoop_next + job->scoop_pos,
254  *match_len);
255  rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
256  job->weak_sum.count);
257  } else {
258  /* set the match_len to the weak_sum count */
259  *match_len = job->weak_sum.count;
260  }
261  *match_pos =
262  rs_signature_find_match(job->signature, RollsumDigest(&job->weak_sum),
263  job->scoop_next + job->scoop_pos, *match_len);
264  return *match_pos != -1;
265 }
266 
267 /** Append a match at match_pos of length match_len to the delta, extending a
268  * previous match if possible, or flushing any previous miss/match. */
269 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
270  size_t match_len)
271 {
272  rs_result result = RS_DONE;
273 
274  /* if last was a match that can be extended, extend it */
275  if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
276  job->basis_len += match_len;
277  } else {
278  /* else appendflush the last value */
279  result = rs_appendflush(job);
280  /* make this the new match value */
281  job->basis_pos = match_pos;
282  job->basis_len = match_len;
283  }
284  /* increment scoop_pos to point at next unscanned data */
285  job->scoop_pos += match_len;
286  /* we can only process from the scoop if output is not blocked */
287  if (result == RS_DONE) {
288  /* process the match data off the scoop */
289  result = rs_processmatch(job);
290  }
291  return result;
292 }
293 
294 /** Append a miss of length miss_len to the delta, extending a previous miss
295  * if possible, or flushing any previous match.
296  *
297  * This also breaks misses up into 32KB segments to avoid accumulating too much
298  * in memory. */
299 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
300 {
301  const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */
302  rs_result result = RS_DONE;
303 
304  /* If last was a match, or max_miss misses, appendflush it. */
305  if (job->basis_len || (job->scoop_pos >= max_miss)) {
306  result = rs_appendflush(job);
307  }
308  /* increment scoop_pos */
309  job->scoop_pos += miss_len;
310  return result;
311 }
312 
313 /** Flush any accumulating hit or miss, appending it to the delta. */
314 static inline rs_result rs_appendflush(rs_job_t *job)
315 {
316  /* if last is a match, emit it and reset last by resetting basis_len */
317  if (job->basis_len) {
318  rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len,
319  job->basis_pos);
320  rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
321  job->basis_len = 0;
322  return rs_processmatch(job);
323  /* else if last is a miss, emit and process it */
324  } else if (job->scoop_pos) {
325  rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos);
326  rs_emit_literal_cmd(job, job->scoop_pos);
327  return rs_processmiss(job);
328  }
329  /* otherwise, nothing to flush so we are done */
330  return RS_DONE;
331 }
332 
333 /** Process matching data in the scoop.
334  *
335  * The scoop contains match data at scoop_next of length scoop_pos. This
336  * function processes that match data, returning RS_DONE if it completes, or
337  * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
338  * still point at the next unscanned data.
339  *
340  * This function currently just removes data from the scoop and adjusts
341  * scoop_pos appropriately. In the future this could be used for something like
342  * context compressing of miss data. Note that it also calls rs_tube_catchup to
343  * output any pending output. */
344 static inline rs_result rs_processmatch(rs_job_t *job)
345 {
346  job->scoop_avail -= job->scoop_pos;
347  job->scoop_next += job->scoop_pos;
348  job->scoop_pos = 0;
349  return rs_tube_catchup(job);
350 }
351 
352 /** Process miss data in the scoop.
353  *
354  * The scoop contains miss data at scoop_next of length scoop_pos. This
355  * function processes that miss data, returning RS_DONE if it completes, or
356  * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
357  * still point at the next unscanned data.
358  *
359  * This function uses rs_tube_copy to queue copying from the scoop into output.
360  * and uses rs_tube_catchup to do the copying. This automaticly removes data
361  * from the scoop, but this can block. While rs_tube_catchup is blocked,
362  * scoop_pos does not point at legit data, so scanning can also not proceed.
363  *
364  * In the future this could do compression of miss data before outputing it. */
365 static inline rs_result rs_processmiss(rs_job_t *job)
366 {
367  rs_tube_copy(job, job->scoop_pos);
368  job->scoop_pos = 0;
369  return rs_tube_catchup(job);
370 }
371 
372 /** State function that does a slack delta containing only literal data to
373  * recreate the input. */
375 {
376  rs_buffers_t *const stream = job->stream;
377  size_t avail = stream->avail_in;
378 
379  if (avail) {
380  rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
381  rs_emit_literal_cmd(job, avail);
382  rs_tube_copy(job, avail);
383  return RS_RUNNING;
384  } else if (rs_job_input_is_ending(job)) {
385  job->statefn = rs_delta_s_end;
386  return RS_RUNNING;
387  }
388  return RS_BLOCKED;
389 }
390 
391 /** State function for writing out the header of the encoding job. */
393 {
395  if (job->signature) {
396  job->statefn = rs_delta_s_scan;
397  } else {
398  rs_trace("no signature provided for delta, using slack deltas");
399  job->statefn = rs_delta_s_slack;
400  }
401  return RS_RUNNING;
402 }
403 
405 {
406  rs_job_t *job;
407 
408  job = rs_job_new("delta", rs_delta_s_header);
409  /* Caller can pass NULL sig or empty sig for "slack deltas". */
410  if (sig && sig->count > 0) {
411  rs_signature_check(sig);
412  /* Caller must have called rs_build_hash_table() by now. */
413  assert(sig->hashtable);
414  job->signature = sig;
415  RollsumInit(&job->weak_sum);
416  }
417  return job;
418 }
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:44
Description of input and output buffers.
Definition: librsync.h:300
logging functions.
int count
Total number of blocks.
Definition: sumset.h:41
int block_len
The block length.
Definition: sumset.h:39
void rs_emit_delta_header(rs_job_t *job)
Write the magic for the start of a delta.
Definition: emit.c:43
static int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len)
find a match at scoop_pos, returning the match_pos and match_len.
Definition: delta.c:240
size_t rs_scoop_total_avail(rs_job_t *job)
Return the total number of bytes available including the scoop and input buffer.
Definition: scoop.c:223
rs_signature_t * signature
Pointer to the signature that&#39;s being used by the operation.
Definition: job.h:51
void rs_scoop_input(rs_job_t *job, size_t len)
Try to accept a from the input buffer to get LEN bytes in the scoop.
Definition: scoop.c:69
static rs_result rs_delta_s_header(rs_job_t *job)
State function for writing out the header of the encoding job.
Definition: delta.c:392
size_t avail_in
Number of bytes available at next_in.
Definition: librsync.h:314
static rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
Append a miss of length miss_len to the delta, extending a previous miss if possible, or flushing any previous match.
Definition: delta.c:299
void rs_emit_end_cmd(rs_job_t *job)
Write an END command.
Definition: emit.c:120
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:92
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:35
Public header for librsync.
static rs_result rs_delta_s_scan(rs_job_t *job)
Get a block of data if possible, and see if it matches.
Definition: delta.c:125
Signature of a whole file.
Definition: sumset.h:37
static rs_result rs_delta_s_slack(rs_job_t *job)
State function that does a slack delta containing only literal data to recreate the input...
Definition: delta.c:374
static rs_result rs_processmiss(rs_job_t *job)
Process miss data in the scoop.
Definition: delta.c:365
rs_job_t * rs_delta_begin(rs_signature_t *sig)
Prepare to compute a streaming delta.
Definition: delta.c:404
Rollsum weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:63
void rs_tube_copy(rs_job_t *job, int len)
Queue up a request to copy through len bytes from the input to the output of the stream.
Definition: tube.c:197
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:161
How to emit commands to the client.
int rs_tube_catchup(rs_job_t *job)
Put whatever will fit from the tube into the output of the stream.
Definition: tube.c:155
static rs_result rs_appendflush(rs_job_t *job)
Flush any accumulating hit or miss, appending it to the delta.
Definition: delta.c:314
Blocked waiting for more data.
Definition: librsync.h:163
void rs_emit_copy_cmd(rs_job_t *job, rs_long_t where, rs_long_t len)
Write a COPY command for given offset and length.
Definition: emit.c:77
The job is still running, and not yet finished or blocked.
Definition: librsync.h:164
int eof_in
True if there is no more data after this.
Definition: librsync.h:317
void rs_emit_literal_cmd(rs_job_t *job, int len)
Write a LITERAL command.
Definition: emit.c:50
Completed successfully.
Definition: librsync.h:162
static rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
Append a match at match_pos of length match_len to the delta, extending a previous match if possible...
Definition: delta.c:269
static rs_result rs_processmatch(rs_job_t *job)
Process matching data in the scoop.
Definition: delta.c:344
of this structure are private.
Definition: job.h:26