summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/rorpiter.py
blob: 875ab1edaaf726ab44ad093441a26d2de2dc249a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# Copyright 2002 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# rdiff-backup is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rdiff-backup; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA

"""Operations on Iterators of Read Only Remote Paths

The main structure will be an iterator that yields RORPaths.
Every RORPath has a "raw" form that makes it more amenable to
being turned into a file.  The raw form of the iterator yields
each RORPath in the form of the tuple (index, data_dictionary,
files), where files is the number of files attached (usually 1 or
0).  After that, if a file is attached, it yields that file.

"""

from __future__ import generators
import tempfile, UserList, types, librsync, Globals, Rdiff, \
	   Hardlink, robust, log, static, rpath, iterfile, TempFile


class RORPIterException(Exception): pass

def ToRaw(rorp_iter):
	"""Convert a rorp iterator to raw form"""
	for rorp in rorp_iter:
		if rorp.file:
			yield (rorp.index, rorp.data, 1)
			yield rorp.file
		else: yield (rorp.index, rorp.data, 0)

def FromRaw(raw_iter):
	"""Convert raw rorp iter back to standard form"""
	for index, data, num_files in raw_iter:
		rorp = rpath.RORPath(index, data)
		if num_files:
			assert num_files == 1, "Only one file accepted right now"
			rorp.setfile(getnext(raw_iter))
		yield rorp

def ToFile(rorp_iter):
	"""Return file version of iterator"""
	return iterfile.FileWrappingIter(ToRaw(rorp_iter))

def FromFile(fileobj):
	"""Recover rorp iterator from file interface"""
	return FromRaw(iterfile.IterWrappingFile(fileobj))

def IterateRPaths(base_rp):
	"""Return an iterator yielding RPaths with given base rp"""
	yield base_rp
	if base_rp.isdir():
		dirlisting = base_rp.listdir()
		dirlisting.sort()
		for filename in dirlisting:
			for rp in IterateRPaths(base_rp.append(filename)):
				yield rp

def Signatures(rp_iter):
	"""Yield signatures of rpaths in given rp_iter"""
	def error_handler(exc, rp):
		log.Log("Error generating signature for %s" % rp.path)
		return None

	for rp in rp_iter:
		if rp.isplaceholder(): yield rp
		else:
			rorp = rp.getRORPath()
			if rp.isreg():
				if rp.isflaglinked(): rorp.flaglinked()
				else:
					fp = robust.check_common_error(
						error_handler, Rdiff.get_signature, (rp,))
					if fp: rorp.setfile(fp)
					else: continue
			yield rorp

def GetSignatureIter(base_rp):
	"""Return a signature iterator recurring over the base_rp"""
	return Signatures(IterateRPaths(base_rp))

def CollateIterators(*rorp_iters):
	"""Collate RORPath iterators by index

	So it takes two or more iterators of rorps and returns an
	iterator yielding tuples like (rorp1, rorp2) with the same
	index.  If one or the other lacks that index, it will be None

	"""
	# overflow[i] means that iter[i] has been exhausted
	# rorps[i] is None means that it is time to replenish it.
	iter_num = len(rorp_iters)
	if iter_num == 2:
		return Collate2Iters(rorp_iters[0], rorp_iters[1])
	overflow = [None] * iter_num
	rorps = overflow[:]

	def setrorps(overflow, rorps):
		"""Set the overflow and rorps list"""
		for i in range(iter_num):
			if not overflow[i] and rorps[i] is None:
				try: rorps[i] = rorp_iters[i].next()
				except StopIteration:
					overflow[i] = 1
					rorps[i] = None

	def getleastindex(rorps):
		"""Return the first index in rorps, assuming rorps isn't empty"""
		return min(map(lambda rorp: rorp.index,
					   filter(lambda x: x, rorps)))

	def yield_tuples(iter_num, overflow, rorps):
		while 1:
			setrorps(overflow, rorps)
			if not None in overflow: break

			index = getleastindex(rorps)
			yieldval = []
			for i in range(iter_num):
				if rorps[i] and rorps[i].index == index:
					yieldval.append(rorps[i])
					rorps[i] = None
				else: yieldval.append(None)
			yield IndexedTuple(index, yieldval)
	return yield_tuples(iter_num, overflow, rorps)

def Collate2Iters(riter1, riter2):
	"""Special case of CollateIterators with 2 arguments

	This does the same thing but is faster because it doesn't have
	to consider the >2 iterator case.  Profiler says speed is
	important here.

	"""
	relem1, relem2 = None, None
	while 1:
		if not relem1:
			try: relem1 = riter1.next()
			except StopIteration:
				if relem2: yield IndexedTuple(index2, (None, relem2))
				for relem2 in riter2:
					yield IndexedTuple(relem2.index, (None, relem2))
				break
			index1 = relem1.index
		if not relem2:
			try: relem2 = riter2.next()
			except StopIteration:
				if relem1: yield IndexedTuple(index1, (relem1, None))
				for relem1 in riter1:
					yield IndexedTuple(relem1.index, (relem1, None))
				break
			index2 = relem2.index

		if index1 < index2:
			yield IndexedTuple(index1, (relem1, None))
			relem1 = None
		elif index1 == index2:
			yield IndexedTuple(index1, (relem1, relem2))
			relem1, relem2 = None, None
		else: # index2 is less
			yield IndexedTuple(index2, (None, relem2))
			relem2 = None

def getnext(iter):
	"""Return the next element of an iterator, raising error if none"""
	try: next = iter.next()
	except StopIteration: raise RORPIterException("Unexpected end to iter")
	return next

def GetDiffIter(sig_iter, new_iter):
	"""Return delta iterator from sig_iter to new_iter

	The accompanying file for each will be a delta as produced by
	rdiff, unless the destination file does not exist, in which
	case it will be the file in its entirety.

	sig_iter may be composed of rorps, but new_iter should have
	full RPaths.

	"""
	collated_iter = CollateIterators(sig_iter, new_iter)
	for rorp, rp in collated_iter: yield diffonce(rorp, rp)

def diffonce(sig_rorp, new_rp):
	"""Return one diff rorp, based from signature rorp and orig rp"""
	if sig_rorp and Globals.preserve_hardlinks and sig_rorp.isflaglinked():
		if new_rp: diff_rorp = new_rp.getRORPath()
		else: diff_rorp = rpath.RORPath(sig_rorp.index)
		diff_rorp.flaglinked()
		return diff_rorp
	elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
		diff_rorp = new_rp.getRORPath()
		#fp = sig_rorp.open("rb")
		#print "---------------------", fp
		#tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig")
		#tmp_sig_rp.delete()
		#tmp_sig_rp.write_from_fileobj(fp)
		#diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"),
		#											 new_rp))
		diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
													 new_rp))
		diff_rorp.set_attached_filetype('diff')
		return diff_rorp
	else:
		# Just send over originial if diff isn't appropriate
		if sig_rorp: sig_rorp.close_if_necessary()
		if not new_rp: return rpath.RORPath(sig_rorp.index)
		elif new_rp.isreg():
			diff_rorp = new_rp.getRORPath(1)
			diff_rorp.set_attached_filetype('snapshot')
			return diff_rorp
		else: return new_rp.getRORPath()

def PatchIter(base_rp, diff_iter):
	"""Patch the appropriate rps in basis_iter using diff_iter"""
	basis_iter = IterateRPaths(base_rp)
	collated_iter = CollateIterators(basis_iter, diff_iter)
	for basisrp, diff_rorp in collated_iter:
		patchonce_action(base_rp, basisrp, diff_rorp).execute()

def patchonce_action(base_rp, basisrp, diff_rorp):
	"""Return action patching basisrp using diff_rorp"""
	assert diff_rorp, "Missing diff index %s" % basisrp.index
	if not diff_rorp.lstat():
		return robust.Action(None, lambda init_val: basisrp.delete(), None)

	if Globals.preserve_hardlinks and diff_rorp.isflaglinked():
		if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
		tf = TempFile.new(basisrp)
		def init(): Hardlink.link_rp(diff_rorp, tf, basisrp)
		return robust.make_tf_robustaction(init, tf, basisrp)
	elif basisrp and basisrp.isreg() and diff_rorp.isreg():
		if diff_rorp.get_attached_filetype() != 'diff':
			raise rpath.RPathException("File %s appears to have changed during"
							" processing, skipping" % (basisrp.path,))
		return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
	else: # Diff contains whole file, just copy it over
		if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
		return robust.copy_with_attribs_action(diff_rorp, basisrp)


class IndexedTuple(UserList.UserList):
	"""Like a tuple, but has .index

	This is used by CollateIterator above, and can be passed to the
	IterTreeReducer.

	"""
	def __init__(self, index, sequence):
		self.index = index
		self.data = tuple(sequence)

	def __len__(self): return len(self.data)

	def __getitem__(self, key):
		"""This only works for numerical keys (easier this way)"""
		return self.data[key]

	def __lt__(self, other): return self.__cmp__(other) == -1
	def __le__(self, other): return self.__cmp__(other) != 1
	def __ne__(self, other): return not self.__eq__(other)
	def __gt__(self, other): return self.__cmp__(other) == 1
	def __ge__(self, other): return self.__cmp__(other) != -1
	
	def __cmp__(self, other):
		assert isinstance(other, IndexedTuple)
		if self.index < other.index: return -1
		elif self.index == other.index: return 0
		else: return 1

	def __eq__(self, other):
		if isinstance(other, IndexedTuple):
			return self.index == other.index and self.data == other.data
		elif type(other) is types.TupleType:
			return self.data == other
		else: return None

	def __str__(self):
		return  "(%s).%s" % (", ".join(map(str, self.data)), self.index)


class DirHandler:
	"""Handle directories when entering and exiting in mirror

	The problem is that we may need to write to a directory that may
	have only read and exec permissions.  Also, when leaving a
	directory tree, we may have modified the directory and thus
	changed the mod and access times.  These need to be updated when
	leaving.

	"""
	def __init__(self, rootrp):
		"""DirHandler initializer - call with root rpath of mirror dir"""
		self.rootrp = rootrp
		assert rootrp.index == ()
		self.cur_dir_index = None # Current directory we have descended into
		self.last_index = None # last index processed

		# This dictionary maps indicies to (rpath, (atime, mtime),
		# perms) triples.  Either or both of the time pair and perms
		# can be None, which means not to update the times or the
		# perms when leaving.  We don't have to update the perms if we
		# didn't have to change them in the first place.  If a
		# directory is explicitly given, then we don't have to update
		# anything because it will be done by the normal process.
		self.index_dict = {}

	def process_old_directories(self, new_dir_index):
		"""Update times/permissions for directories we are leaving

		Returns greatest index of the current index that has been seen
		before (i.e. no need to process up to then as new dir).

		"""
		if self.cur_dir_index is None: return -1 # no previous directory

		i = len(self.cur_dir_index)
		while 1:
			if new_dir_index[:i] == self.cur_dir_index[:i]:
				return i
			self.process_old_dir(self.cur_dir_index[:i])
			i-=1

	def process_old_dir(self, dir_index):
		"""Process outstanding changes for given dir index"""
		rpath, times, perms = self.index_dict[dir_index]
		if times: apply(rpath.settime, times)
		if perms: rpath.chmod(perms)

	def init_new_dirs(self, rpath, new_dir_index, common_dir_index):
		"""Initialize any new directories

		Record the time, and change permissions if no write access.
		Use rpath if it is given to access permissions and times.

		"""
		for i in range(common_dir_index, len(new_dir_index)):
			process_index = new_dir_index[:i]
			if rpath.index == process_index:
				self.index_dict[process_index] = (None, None, None)
			else:
				new_rpath = self.rootrp.new_index(process_index)
				if new_rpath.hasfullperms(): perms = None
				else: perms = new_rpath.getperms()
				times = (new_rpath.getatime(), new_rpath.getmtime())
				self.index_dict[process_index] = new_rpath, times, perms

	def __call__(self, rpath):
		"""Given rpath, process containing directories"""
		if rpath.isdir(): new_dir_index = rpath.index
		elif not rpath.index: return # no directory contains root
		else: new_dir_index = rpath.index[:-1]
		
		common_dir_index = self.process_old_directories(new_dir_index)
		self.init_new_dirs(rpath, new_dir_index, common_dir_index)
		self.cur_dir_index = new_dir_index

	def Finish(self):
		"""Process any remaining directories"""
		indicies = self.index_dict.keys()
		indicies.sort()
		assert len(indicies) >= 1, indicies
		indicies.reverse()
		map(self.process_old_dir, indicies)


def FillInIter(rpiter, rootrp):
	"""Given ordered rpiter and rootrp, fill in missing indicies with rpaths

	For instance, suppose rpiter contains rpaths with indicies (),
	(1,2), (2,5).  Then return iter with rpaths (), (1,), (1,2), (2,),
	(2,5).  This is used when we need to process directories before or
	after processing a file in that directory.

	"""
	# Handle first element as special case
	first_rp = rpiter.next() # StopIteration gets passed upwards
	cur_index = first_rp.index
	for i in range(len(cur_index)):
		yield rootrp.new_index(cur_index[:i])
	yield first_rp
	del first_rp
	old_index = cur_index

	# Now do the others (1,2,3) (1,4,5)
	for rp in rpiter:
		cur_index = rp.index
		if not cur_index[:-1] == old_index[:-1]: # Handle special case quickly
			for i in range(1, len(cur_index)): # i==0 case already handled
				if cur_index[:i] != old_index[:i]:
					yield rootrp.new_index(cur_index[:i])
		yield rp
		old_index = cur_index


class IterTreeReducer:
	"""Tree style reducer object for iterator

	The indicies of a RORPIter form a tree type structure.  This class
	can be used on each element of an iter in sequence and the result
	will be as if the corresponding tree was reduced.  This tries to
	bridge the gap between the tree nature of directories, and the
	iterator nature of the connection between hosts and the temporal
	order in which the files are processed.

	"""
	def __init__(self, branch_class, branch_args):
		"""ITR initializer"""
		self.branch_class = branch_class
		self.branch_args = branch_args
		self.index = None
		self.root_branch = branch_class(*branch_args)
		self.branches = [self.root_branch]

	def finish_branches(self, index):
		"""Run Finish() on all branches index has passed

		When we pass out of a branch, delete it and process it with
		the parent.  The innermost branches will be the last in the
		list.  Return None if we are out of the entire tree, and 1
		otherwise.

		"""
		branches = self.branches
		while 1:
			to_be_finished = branches[-1]
			base_index = to_be_finished.base_index
			if base_index != index[:len(base_index)]:
				# out of the tree, finish with to_be_finished
				to_be_finished.call_end_proc()
				del branches[-1]
				if not branches: return None
				branches[-1].branch_process(to_be_finished)
			else: return 1

	def add_branch(self, index):
		"""Return branch of type self.branch_class, add to branch list"""
		branch = self.branch_class(*self.branch_args)
		branch.base_index = index
		self.branches.append(branch)
		return branch

	def process_w_branch(self, branch, args):
		"""Run start_process on latest branch"""
		robust.check_common_error(branch.on_error,
								  branch.start_process, args)
		if not branch.caught_exception: branch.start_successful = 1

	def Finish(self):
		"""Call at end of sequence to tie everything up"""
		while 1:
			to_be_finished = self.branches.pop()
			to_be_finished.call_end_proc()
			if not self.branches: break
			self.branches[-1].branch_process(to_be_finished)

	def __call__(self, *args):
		"""Process args, where args[0] is current position in iterator

		Returns true if args successfully processed, false if index is
		not in the current tree and thus the final result is
		available.

		Also note below we set self.index after doing the necessary
		start processing, in case there is a crash in the middle.

		"""
		index = args[0]
		if self.index is None:
			self.root_branch.base_index = index
			self.process_w_branch(self.root_branch, args)
			self.index = index
			return 1

		if index <= self.index:
			log.Log("Warning: oldindex %s >= newindex %s" %
					(self.index, index), 2)
			return 1

		if self.finish_branches(index) is None:
			return None # We are no longer in the main tree
		last_branch = self.branches[-1]
		if last_branch.start_successful:
			if last_branch.can_fast_process(*args):
				last_branch.fast_process(*args)
			else:
				branch = self.add_branch(index)
				self.process_w_branch(branch, args)
		else: last_branch.log_prev_error(index)

		self.index = index
		return 1


class ITRBranch:
	"""Helper class for IterTreeReducer below

	There are five stub functions below: start_process, end_process,
	branch_process, can_fast_process, and fast_process.  A class that
	subclasses this one will probably fill in these functions to do
	more.

	It is important that this class be pickable, so keep that in mind
	when subclassing (this is used to resume failed sessions).

	"""
	base_index = index = None
	finished = None
	caught_exception = start_successful = None

	def call_end_proc(self):
		"""Runs the end_process on self, checking for errors"""
		if self.finished or not self.start_successful:
			self.caught_exception = 1
		if self.caught_exception: self.log_prev_error(self.base_index)
		else: robust.check_common_error(self.on_error, self.end_process)
		self.finished = 1

	def start_process(self, *args):
		"""Do some initial processing (stub)"""
		pass

	def end_process(self):
		"""Do any final processing before leaving branch (stub)"""
		pass

	def branch_process(self, branch):
		"""Process a branch right after it is finished (stub)"""
		assert branch.finished
		pass

	def can_fast_process(self, *args):
		"""True if object can be processed without new branch (stub)"""
		return None

	def fast_process(self, *args):
		"""Process args without new child branch (stub)"""
		pass

	def on_error(self, exc, *args):
		"""This is run on any exception in start/end-process"""
		self.caught_exception = 1
		if args and args[0] and isinstance(args[0], tuple):
			filename = os.path.join(*args[0])
		elif self.index: filename = os.path.join(*self.index)
		else: filename = "."
		log.Log("Error '%s' processing %s" % (exc, filename), 2)

	def log_prev_error(self, index):
		"""Call function if no pending exception"""
		log.Log("Skipping %s because of previous error" %
			(os.path.join(*index),), 2)


class DestructiveSteppingFinalizer(ITRBranch):
		"""Finalizer that can work on an iterator of dsrpaths

		The reason we have to use an IterTreeReducer is that some files
		should be updated immediately, but for directories we sometimes
		need to update all the files in the directory before finally
		coming back to it.

		"""
		dsrpath = None
		def start_process(self, index, dsrpath):
			self.dsrpath = dsrpath

		def end_process(self):
			if self.dsrpath: self.dsrpath.write_changes()

		def can_fast_process(self, index, dsrpath):
			return not self.dsrpath.isdir()

		def fast_process(self, index, dsrpath):
			if self.dsrpath: self.dsrpath.write_changes()