summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/robust.py
blob: 67f32bec5877690dcaac47fe078529c9d485c9d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# Copyright 2002 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# rdiff-backup is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rdiff-backup; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA

"""Prevent mirror from being corrupted; handle errors

Ideally no matter an instance of rdiff-backup gets aborted, no
information should get lost.  The target directory should be left in a
coherent state, and later instances of rdiff-backup should clean
things up so there is no sign that anything ever got aborted or
failed.

Thus, files should be updated in an atomic way as possible.  Each file
should be updated (and the corresponding diff files written) or not,
and it should be clear which happened.  In general, I don't think this
is possible, since the creation of the diff files and the changing of
updated files cannot be guarateed to happen together.  It is possible,
I think, to record various information to files which would allow a
later process to figure out what the last operation was, but this
would add several file operations to the processing of each file, and
I don't think, would be a good tradeoff.

The compromise reached here is that diff files should be created just
before the mirror files are updated, and each file update should be
done with a rename operation on a file in the same directory.
Furthermore, every once in a while, rdiff-backup will record which
file it just finished processing.  If any fatal errors are caught, it
will also record the last processed file.  Future instances may not
know exactly when the previous instance was aborted, but they will be
able to narrow down the possibilities.

"""

import os, time
from log import Log
import Time, librsync, errno, signal, cPickle, C, \
	   Hardlink, TempFile, static, rpath, Globals


class Action:
	"""Represents a file operation to be accomplished later"""
	def __init__(self, init_thunk, final_func, error_handler):
		"""Action initializer

		All the thunks are functions whose return value will be
		ignored.  init_thunk should not make any irreversible changes
		but prepare for the writing of the important data. final_func
		should be as short as possible and do the real work.
		error_handler is run if there is an error in init_thunk or
		final_func.  Errors in init_thunk should be corrected by
		error_handler as if nothing had been run in the first place.

		init_thunk takes no arguments.

		final_thunk takes the return value of init_thunk as its
		argument, and its return value is returned by execute().

		error_handler takes three arguments: the exception, a value
		which is true just in case self.init_thunk ran correctly, and
		a value which will be the return value of init_thunk if it ran
		correctly.

		"""
		self.init_thunk = init_thunk or self.default_init_thunk
		self.final_func = final_func or self.default_final_func
		self.error_handler = error_handler or self.default_error_handler

	def execute(self):
		"""Actually run the operation"""
		ran_init_thunk = None
		try:
			init_val = self.init_thunk()
			ran_init_thunk = 1
			return self.final_func(init_val)
		except Exception, exc: # Catch all errors
			Log.exception()
			TracebackArchive.add()
			if ran_init_thunk: self.error_handler(exc, 1, init_val)
			else: self.error_handler(exc, None, None)
			raise exc

	def default_init_thunk(self): return None
	def default_final_func(self, init_val): return init_val
	def default_error_handler(self, exc, ran_init, init_val): pass


null_action = Action(None, None, None)
def chain(*robust_action_list):
	"""Return chain tying together a number of robust actions

	The whole chain will be aborted if some error occurs in
	initialization stage of any of the component actions.

	"""
	ras_with_started_inits, init_return_vals = [], []
	def init():
		for ra in robust_action_list:
			ras_with_started_inits.append(ra)
			init_return_vals.append(ra.init_thunk())
		return init_return_vals
	def final(init_return_vals):
		final_vals = []
		for ra, init_val in zip(robust_action_list, init_return_vals):
			final_vals.append(ra.final_func(init_val))
		return final_vals
	def error(exc, ran_init, init_val):
		for ra, init_val in zip(ras_with_started_inits, init_return_vals):
			ra.error_handler(exc, 1, init_val)
		for ra in ras_with_started_inits[len(init_return_vals):]:
			ra.error_handler(exc, None, None)
	return Action(init, final, error)

def chain_nested(*robust_action_list):
	"""Like chain but final actions performed in reverse order"""
	ras_with_started_inits, init_vals = [], []
	def init():
		for ra in robust_action_list:
			ras_with_started_inits.append(ra)
			init_vals.append(ra.init_thunk())
		return init_vals
	def final(init_vals):
		ras_and_inits = zip(robust_action_list, init_vals)
		ras_and_inits.reverse()
		final_vals = []
		for ra, init_val in ras_and_inits:
			final_vals.append(ra.final_func(init_val))
		return final_vals
	def error(exc, ran_init, init_val):
		for ra, init_val in zip(ras_with_started_inits, init_vals):
			ra.error_handler(exc, 1, init_val)
		for ra in ras_with_started_inits[len(init_vals):]:
			ra.error_handler(exc, None, None)
	return Action(init, final, error)

def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
	"""Shortcut Action creator when only tempfiles involved

	Often the robust action will just consist of some initial
	stage, renaming tempfiles in the final stage, and deleting
	them if there is an error.  This function makes it easier to
	create Actions of that type.

	"""
	if isinstance(tempfiles, TempFile.TempFile): tempfiles = (tempfiles,)
	if isinstance(final_renames, rpath.RPath): final_renames = (final_renames,)
	if final_renames is None: final_renames = [None] * len(tempfiles)
	assert len(tempfiles) == len(final_renames)

	def final(init_val): # rename tempfiles to final positions
		for tempfile, destination in zip(tempfiles, final_renames):
			if destination:
				if destination.isdir(): # Cannot rename over directory
					destination.delete()
				tempfile.rename(destination)
		return init_val
	def error(exc, ran_init, init_val):
		for tf in tempfiles: tf.delete()
	return Action(init_thunk, final, error)

def copy_action(rorpin, rpout):
	"""Return robust action copying rorpin to rpout

	The source can be a rorp or an rpath.  Does not recurse.  If
	directories copied, then just exit (output directory not
	overwritten).

	"""
	tfl = [None] # Need some mutable state to hold tf value
	def init(): 
		if not (rorpin.isdir() and rpout.isdir()): # already a dir
			tfl[0] = tf = TempFile.new(rpout)
			if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb"))
			else: rpath.copy(rorpin, tf)
			return tf
		else: return None
	def final(tf):
		if tf and tf.lstat():
			if rpout.isdir(): rpout.delete()
			tf.rename(rpout)
		return rpout
	def error(exc, ran_init, init_val):
		if tfl[0]: tfl[0].delete()
	return Action(init, final, error)

def copy_with_attribs_action(rorpin, rpout, compress = None):
	"""Like copy_action but also copy attributes"""
	tfl = [None] # Need some mutable state for error handler
	def init(): 
		if not (rorpin.isdir() and rpout.isdir()): # already a dir
			tfl[0] = tf = TempFile.new(rpout)
			if rorpin.isreg():
				tf.write_from_fileobj(rorpin.open("rb"), compress)
			else: rpath.copy(rorpin, tf)
			if tf.lstat(): # Some files, like sockets, won't be created
				rpath.copy_attribs(rorpin, tf)
			return tf
		else: return None
	def final(tf):
		if rorpin.isdir() and rpout.isdir():
			rpath.copy_attribs(rorpin, rpout)
		elif tf and tf.lstat():
			if rpout.isdir(): rpout.delete() # can't rename over dir
			tf.rename(rpout)
		return rpout
	def error(exc, ran_init, init_val):
		if tfl[0]: tfl[0].delete()
	return Action(init, final, error)

def copy_attribs_action(rorpin, rpout):
	"""Return action which just copies attributes

	Copying attributes is already pretty atomic, so just run
	normal sequence.

	"""
	def final(init_val):
		rpath.copy_attribs(rorpin, rpout)
		return rpout
	return Action(None, final, None)

def symlink_action(rpath, linktext):
	"""Return symlink action by moving one file over another"""
	tf = TempFile.new(rpath)
	def init(): tf.symlink(linktext)
	return make_tf_robustaction(init, tf, rpath)

def destructive_write_action(rp, s):
	"""Return action writing string s to rpath rp in robust way

	This will overwrite any data currently in rp.

	"""
	tf = TempFile.new(rp)
	def init():
		fp = tf.open("wb")
		fp.write(s)
		fp.close()
		tf.setdata()
	return make_tf_robustaction(init, tf, rp)

def check_common_error(error_handler, function, args = []):
	"""Apply function to args, if error, run error_handler on exception

	This uses the catch_error predicate below to only catch
	certain exceptions which seems innocent enough.

	"""
	try: return function(*args)
	except Exception, exc:
		TracebackArchive.add([function] + list(args))
		if catch_error(exc):
			Log.exception()
			conn = Globals.backup_writer
			if conn is not None: # increment error count
				ITRB_exists = conn.Globals.is_not_None('ITRB')
				if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors')
			if error_handler: return error_handler(exc, *args)
			else: return
		Log.exception(1, 2)
		raise

def catch_error(exc):
	"""Return true if exception exc should be caught"""

	for exception_class in (rpath.SkipFileException, rpath.RPathException,
							librsync.librsyncError, C.UnknownFileTypeError):
		if isinstance(exc, exception_class): return 1
	if (isinstance(exc, EnvironmentError) and
		errno.errorcode[exc[0]] in ('EPERM', 'ENOENT', 'EACCES', 'EBUSY',
									'EEXIST', 'ENOTDIR', 'ENAMETOOLONG',
									'EINTR', 'ENOTEMPTY', 'EIO', 'ETXTBSY',
									'ESRCH', 'EINVAL')):
		return 1
	return 0

def listrp(rp):
	"""Like rp.listdir() but return [] if error, and sort results"""
	def error_handler(exc):
		Log("Error listing directory %s" % rp.path, 2)
		return []
	dir_listing = check_common_error(error_handler, rp.listdir)
	dir_listing.sort()
	return dir_listing

def signal_handler(signum, frame):
	"""This is called when signal signum is caught"""
	raise SignalException(signum)

def install_signal_handlers():
	"""Install signal handlers on current connection"""
	for signum in [signal.SIGQUIT, signal.SIGHUP, signal.SIGTERM]:
		signal.signal(signum, signal_handler)


class SignalException(Exception):
	"""SignalException(signum) means signal signum has been received"""
	pass


class TracebackArchive:
	"""Save last 10 caught exceptions, so they can be printed if fatal"""
	_traceback_strings = []
	def add(cls, extra_args = []):
		"""Add most recent exception to archived list

		If extra_args are present, convert to strings and add them as
		extra information to same traceback archive.

		"""
		cls._traceback_strings.append(Log.exception_to_string(extra_args))
		if len(cls._traceback_strings) > 10:
			cls._traceback_strings = cls._traceback_strings[:10]

	def log(cls):
		"""Print all exception information to log file"""
		if cls._traceback_strings:
			Log("------------ Old traceback info -----------\n%s\n"
				"-------------------------------------------" %
				("\n".join(cls._traceback_strings),), 3)

static.MakeClass(TracebackArchive)


class SaveState:
	"""Save state in the middle of backups for resuming later"""
	_last_file_sym = None # RPath of sym pointing to last file processed
	_last_file_definitive_rp = None # Touch this if last file is really last
	_last_checkpoint_time = 0 # time in seconds of last checkpoint
	_checkpoint_rp = None # RPath of checkpoint data pickle
	
	def init_filenames(cls):
		"""Set rpaths of markers.  Assume rbdir already set."""
		if not Globals.isbackup_writer:
			return Globals.backup_writer.SaveState.init_filenames()

		assert Globals.local_connection is Globals.rbdir.conn, \
			   (Globals.rbdir.conn, Globals.backup_writer)

		cls._last_file_sym = Globals.rbdir.append(
			"last-file-incremented.%s.data" % Time.curtimestr)
		cls._checkpoint_rp = Globals.rbdir.append(
			"checkpoint-data.%s.data" % Time.curtimestr)
		cls._last_file_definitive_rp = Globals.rbdir.append(
			"last-file-definitive.%s.data" % Time.curtimestr)

	def touch_last_file(cls):
		"""Touch last file marker, indicating backup has begun"""
		if not cls._last_file_sym.lstat(): cls._last_file_sym.touch()

	def touch_last_file_definitive(cls):
		"""Create last-file-definitive marker

		When a backup gets aborted, there may be time to indicate the
		last file successfully processed, and this should be touched.
		Sometimes when the abort is hard, there may be a last file
		indicated, but further files since then have been processed,
		in which case this shouldn't be touched.

		"""
		cls._last_file_definitive_rp.touch()

	def record_last_file_action(cls, last_file_rorp):
		"""Action recording last file to be processed as symlink in rbdir

		last_file_rorp is None means that no file is known to have
		been processed.

		"""
		if last_file_rorp:
			symtext = apply(os.path.join,
							('increments',) + last_file_rorp.index)
			return symlink_action(cls._last_file_sym, symtext)
		else: return Action(None, lambda init_val: cls.touch_last_file(), None)

	def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None):
		"""Save states of tree reducer and finalizer during inc backup

		If override is true, checkpoint even if one isn't due.

		"""
		if not override and not cls.checkpoint_needed(): return
		assert cls._checkpoint_rp, "_checkpoint_rp not set yet"

		cls._last_checkpoint_time = time.time()
		Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
		state_string = cPickle.dumps((ITR, finalizer))
		chain(destructive_write_action(cls._checkpoint_rp, state_string),
			  cls.record_last_file_action(last_file_rorp)).execute()

	def checkpoint_needed(cls):
		"""Returns true if another checkpoint is called for"""
		return (time.time() > cls._last_checkpoint_time +
				Globals.checkpoint_interval)

	def checkpoint_remove(cls):
		"""Remove all checkpointing data after successful operation"""
		for rp in Resume.get_relevant_rps(): rp.delete()
		if Globals.preserve_hardlinks: Hardlink.remove_all_checkpoints()

static.MakeClass(SaveState)


class ResumeException(Exception):
	"""Indicates some error has been encountered while trying to resume"""
	pass

class Resume:
	"""Check for old aborted backups and resume if necessary"""
	_session_info_list = None # List of ResumeSessionInfo's, sorted by time
	def FindTime(cls, index, later_than = 0):
		"""For a given index, find the appropriate time to use for inc

		If it is clear which time to use (because it is determined by
		definitive records, or there are no aborted backup, etc.) then
		just return the appropriate time.  Otherwise, if an aborted
		backup was last checkpointed before the index, assume that it
		didn't get there, and go for the older time.  If an inc file
		is already present, the function will be rerun with later time
		specified.

		"""
		assert Globals.isbackup_writer
		if Time.prevtime > later_than: return Time.prevtime # usual case

		for si in cls.get_sis_covering_index(index):
			if si.time > later_than: return si.time
		raise rpath.SkipFileException("Index %s already covered, skipping" %
									  str(index))

	def get_sis_covering_index(cls, index):
		"""Return sorted list of SessionInfos which may cover index

		Aborted backup may be relevant unless index is lower and we
		are sure that it didn't go further.

		"""
		return filter(lambda session_info:
					  not ((session_info.last_index is None or
							session_info.last_index < index) and
						   session_info.last_definitive), 
				cls._session_info_list)

	def SetSessionInfo(cls):
		"""Read data directory and initialize _session_info"""
		assert Globals.isbackup_writer
		silist = []
		rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps())
		times = rp_quad_dict.keys()
		times.sort()
		for time in times:
			try: silist.append(cls.quad_to_si(time, rp_quad_dict[time]))
			except ResumeException:
				Log("Bad resume information found, skipping", 2)
		cls._session_info_list = silist

	def get_relevant_rps(cls):
		"""Return list of relevant rpaths in rbdata directory"""
		relevant_bases = ['last-file-incremented', 'last-file-mirrored',
						  'checkpoint-data', 'last-file-definitive']
		rps = map(Globals.rbdir.append, Globals.rbdir.listdir())
		return filter(lambda rp: rp.isincfile()
					  and rp.getincbase_str() in relevant_bases, rps)

	def group_rps_by_time(cls, rplist):
		"""Take list of rps return time dict {time: quadlist}

		Times in seconds are the keys, values are triples of rps
		[last-file-incremented, last-file-mirrored, checkpoint-data,
		 last-is-definitive].

		"""
		result = {}
		for rp in rplist:
			time = Time.stringtotime(rp.getinctime())
			if result.has_key(time): quadlist = result[time]
			else: quadlist = [None, None, None, None]
			base_string = rp.getincbase_str()
			if base_string == 'last-file-incremented': quadlist[0] = rp
			elif base_string == 'last-file-mirrored': quadlist[1] = rp
			elif base_string == 'last-file-definitive': quadlist[3] = 1
			else:
				assert base_string == 'checkpoint-data'
				quadlist[2] = rp
			result[time] = quadlist
		return result

	def quad_to_si(cls, time, quad):
		"""Take time, quadlist, return associated ResumeSessionInfo"""
		increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad
		if increment_sym and mirror_sym:
			raise ResumeException("both mirror and inc sym shouldn't exist")
		ITR, finalizer = None, None
		if increment_sym:
			mirror = None
			last_index = cls.sym_to_index(increment_sym)
			if checkpoint_rp:
				ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp)
		elif mirror_sym:
			mirror = 1
			last_index = cls.sym_to_index(mirror_sym)
			if checkpoint_rp:
				finalizer = cls.unpickle_checkpoint(checkpoint_rp)
		else: raise ResumeException("Missing increment or mirror sym")
		return ResumeSessionInfo(mirror, time, last_index, last_definitive,
								 finalizer, ITR)

	def sym_to_index(cls, sym_rp):
		"""Read last file sym rp, return last file index

		If sym_rp is not a sym at all, return None, indicating that no
		file index was ever conclusively processed.

		"""
		if not sym_rp.issym(): return None
		link_components = sym_rp.readlink().split("/")
		assert link_components[0] == 'increments'
		return tuple(link_components[1:])

	def unpickle_checkpoint(cls, checkpoint_rp):
		"""Read data from checkpoint_rp and return unpickled data

		Return value is pair (patch increment ITR, finalizer state).

		"""
		fp = checkpoint_rp.open("rb")
		data = fp.read()
		fp.close()
		try: result = cPickle.loads(data)
		except Exception, exc:
			raise ResumeException("Bad pickle at %s: %s" %
								  (checkpoint_rp.path, exc))
		return result

	def ResumeCheck(cls):
		"""Return relevant ResumeSessionInfo if there's one we should resume

		Also if find RSI to resume, reset current time to old resume
		time.

		"""
		cls.SetSessionInfo()
		if not cls._session_info_list:
			if Globals.resume == 1: 
				Log.FatalError("User specified resume, but no data on "
							   "previous backup found.")
			else: return None
		else:
			si = cls._session_info_list[-1]
			if (Globals.resume == 1 or
				(time.time() <= (si.time + Globals.resume_window) and
				 not Globals.resume == 0)):
				Log("Resuming aborted backup dated %s" %
					Time.timetopretty(si.time), 2)
				Time.setcurtime(si.time)
				if Globals.preserve_hardlinks:
					if (not si.last_definitive or not
						Hardlink.retrieve_checkpoint(Globals.rbdir, si.time)):
						Log("Hardlink information not successfully "
							"recovered.", 2)
				return si
			else:
				Log("Last backup dated %s was aborted, but we aren't "
					"resuming it." % Time.timetopretty(si.time), 2)
				return None
		assert None

static.MakeClass(Resume)


class ResumeSessionInfo:
	"""Hold information about a previously aborted session"""
	def __init__(self, mirror, time, last_index,
				 last_definitive, finalizer = None, ITR = None):
		"""Class initializer

		time - starting time in seconds of backup
		mirror - true if backup was a mirror, false if increment
		last_index - Last confirmed index processed by backup, or None
		last_definitive - True is we know last_index is really last
		finalizer - the dsrp finalizer if available
		ITR - For increment, ITM reducer (assume mirror if NA)

		"""
		self.time = time
		self.mirror = mirror
		self.last_index = last_index
		self.last_definitive = last_definitive
		self.ITR, self.finalizer, = ITR, finalizer