summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_lockable_files.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_lockable_files.py')
-rw-r--r--bzrlib/tests/test_lockable_files.py351
1 files changed, 351 insertions, 0 deletions
diff --git a/bzrlib/tests/test_lockable_files.py b/bzrlib/tests/test_lockable_files.py
new file mode 100644
index 0000000..e8b2345
--- /dev/null
+++ b/bzrlib/tests/test_lockable_files.py
@@ -0,0 +1,351 @@
+# Copyright (C) 2005-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import bzrlib
+from bzrlib import (
+ errors,
+ lockdir,
+ osutils,
+ transport,
+ )
+from bzrlib.lockable_files import LockableFiles, TransportLock
+from bzrlib.tests import (
+ TestCaseInTempDir,
+ TestNotApplicable,
+ )
+from bzrlib.tests.test_smart import TestCaseWithSmartMedium
+from bzrlib.tests.test_transactions import DummyWeave
+from bzrlib.transactions import (PassThroughTransaction,
+ ReadOnlyTransaction,
+ WriteTransaction,
+ )
+
+
+# these tests are applied in each parameterized suite for LockableFiles
+#
+# they use an old style of parameterization, but we want to remove this class
+# so won't modernize them now. - mbp 20080430
+class _TestLockableFiles_mixin(object):
+
+ def test_transactions(self):
+ self.assertIs(self.lockable.get_transaction().__class__,
+ PassThroughTransaction)
+ self.lockable.lock_read()
+ try:
+ self.assertIs(self.lockable.get_transaction().__class__,
+ ReadOnlyTransaction)
+ finally:
+ self.lockable.unlock()
+ self.assertIs(self.lockable.get_transaction().__class__,
+ PassThroughTransaction)
+ self.lockable.lock_write()
+ self.assertIs(self.lockable.get_transaction().__class__,
+ WriteTransaction)
+ # check that finish is called:
+ vf = DummyWeave('a')
+ self.lockable.get_transaction().register_dirty(vf)
+ self.lockable.unlock()
+ self.assertTrue(vf.finished)
+
+ def test__escape(self):
+ self.assertEqual('%25', self.lockable._escape('%'))
+
+ def test__escape_empty(self):
+ self.assertEqual('', self.lockable._escape(''))
+
+ def test_break_lock(self):
+ # some locks are not breakable
+ self.lockable.lock_write()
+ try:
+ self.assertRaises(AssertionError, self.lockable.break_lock)
+ except NotImplementedError:
+ # this lock cannot be broken
+ self.lockable.unlock()
+ raise TestNotApplicable("%r is not breakable" % (self.lockable,))
+ l2 = self.get_lockable()
+ orig_factory = bzrlib.ui.ui_factory
+ # silent ui - no need for stdout
+ bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
+ try:
+ l2.break_lock()
+ finally:
+ bzrlib.ui.ui_factory = orig_factory
+ try:
+ l2.lock_write()
+ l2.unlock()
+ finally:
+ self.assertRaises(errors.LockBroken, self.lockable.unlock)
+ self.assertFalse(self.lockable.is_locked())
+
+ def test_lock_write_returns_None_refuses_token(self):
+ token = self.lockable.lock_write()
+ self.addCleanup(self.lockable.unlock)
+ if token is not None:
+ # This test does not apply, because this lockable supports
+ # tokens.
+ raise TestNotApplicable("%r uses tokens" % (self.lockable,))
+ self.assertRaises(errors.TokenLockingNotSupported,
+ self.lockable.lock_write, token='token')
+
+ def test_lock_write_returns_token_when_given_token(self):
+ token = self.lockable.lock_write()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ new_lockable = self.get_lockable()
+ token_from_new_lockable = new_lockable.lock_write(token=token)
+ self.addCleanup(new_lockable.unlock)
+ self.assertEqual(token, token_from_new_lockable)
+
+ def test_lock_write_raises_on_token_mismatch(self):
+ token = self.lockable.lock_write()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ different_token = token + 'xxx'
+ # Re-using the same lockable instance with a different token will
+ # raise TokenMismatch.
+ self.assertRaises(errors.TokenMismatch,
+ self.lockable.lock_write, token=different_token)
+ # A separate instance for the same lockable will also raise
+ # TokenMismatch.
+ # This detects the case where a caller claims to have a lock (via
+ # the token) for an external resource, but doesn't (the token is
+ # different). Clients need a separate lock object to make sure the
+ # external resource is probed, whereas the existing lock object
+ # might cache.
+ new_lockable = self.get_lockable()
+ self.assertRaises(errors.TokenMismatch,
+ new_lockable.lock_write, token=different_token)
+
+ def test_lock_write_with_matching_token(self):
+ # If the token matches, so no exception is raised by lock_write.
+ token = self.lockable.lock_write()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ # The same instance will accept a second lock_write if the specified
+ # token matches.
+ self.lockable.lock_write(token=token)
+ self.lockable.unlock()
+ # Calling lock_write on a new instance for the same lockable will
+ # also succeed.
+ new_lockable = self.get_lockable()
+ new_lockable.lock_write(token=token)
+ new_lockable.unlock()
+
+ def test_unlock_after_lock_write_with_token(self):
+ # If lock_write did not physically acquire the lock (because it was
+ # passed a token), then unlock should not physically release it.
+ token = self.lockable.lock_write()
+ self.addCleanup(self.lockable.unlock)
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ new_lockable = self.get_lockable()
+ new_lockable.lock_write(token=token)
+ new_lockable.unlock()
+ self.assertTrue(self.lockable.get_physical_lock_status())
+
+ def test_lock_write_with_token_fails_when_unlocked(self):
+ # Lock and unlock to get a superficially valid token. This mimics a
+ # likely programming error, where a caller accidentally tries to lock
+ # with a token that is no longer valid (because the original lock was
+ # released).
+ token = self.lockable.lock_write()
+ self.lockable.unlock()
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+
+ self.assertRaises(errors.TokenMismatch,
+ self.lockable.lock_write, token=token)
+
+ def test_lock_write_reenter_with_token(self):
+ token = self.lockable.lock_write()
+ try:
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ # Relock with a token.
+ token_from_reentry = self.lockable.lock_write(token=token)
+ try:
+ self.assertEqual(token, token_from_reentry)
+ finally:
+ self.lockable.unlock()
+ finally:
+ self.lockable.unlock()
+ # The lock should be unlocked on disk. Verify that with a new lock
+ # instance.
+ new_lockable = self.get_lockable()
+ # Calling lock_write now should work, rather than raise LockContention.
+ new_lockable.lock_write()
+ new_lockable.unlock()
+
+ def test_second_lock_write_returns_same_token(self):
+ first_token = self.lockable.lock_write()
+ try:
+ if first_token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ # Relock the already locked lockable. It should return the same
+ # token.
+ second_token = self.lockable.lock_write()
+ try:
+ self.assertEqual(first_token, second_token)
+ finally:
+ self.lockable.unlock()
+ finally:
+ self.lockable.unlock()
+
+ def test_leave_in_place(self):
+ token = self.lockable.lock_write()
+ try:
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ self.lockable.leave_in_place()
+ finally:
+ self.lockable.unlock()
+ # At this point, the lock is still in place on disk
+ self.assertRaises(errors.LockContention, self.lockable.lock_write)
+ # But should be relockable with a token.
+ self.lockable.lock_write(token=token)
+ self.lockable.unlock()
+ # Cleanup: we should still be able to get the lock, but we restore the
+ # behavior to clearing the lock when unlocking.
+ self.lockable.lock_write(token=token)
+ self.lockable.dont_leave_in_place()
+ self.lockable.unlock()
+
+ def test_dont_leave_in_place(self):
+ token = self.lockable.lock_write()
+ try:
+ if token is None:
+ # This test does not apply, because this lockable refuses
+ # tokens.
+ return
+ self.lockable.leave_in_place()
+ finally:
+ self.lockable.unlock()
+ # At this point, the lock is still in place on disk.
+ # Acquire the existing lock with the token, and ask that it is removed
+ # when this object unlocks, and unlock to trigger that removal.
+ new_lockable = self.get_lockable()
+ new_lockable.lock_write(token=token)
+ new_lockable.dont_leave_in_place()
+ new_lockable.unlock()
+ # At this point, the lock is no longer on disk, so we can lock it.
+ third_lockable = self.get_lockable()
+ third_lockable.lock_write()
+ third_lockable.unlock()
+
+
+# This method of adapting tests to parameters is different to
+# the TestProviderAdapters used elsewhere, but seems simpler for this
+# case.
+class TestLockableFiles_TransportLock(TestCaseInTempDir,
+ _TestLockableFiles_mixin):
+
+ def setUp(self):
+ TestCaseInTempDir.setUp(self)
+ t = transport.get_transport_from_path('.')
+ t.mkdir('.bzr')
+ self.sub_transport = t.clone('.bzr')
+ self.lockable = self.get_lockable()
+ self.lockable.create_lock()
+
+ def stop_server(self):
+ super(TestLockableFiles_TransportLock, self).stop_server()
+ # free the subtransport so that we do not get a 5 second
+ # timeout due to the SFTP connection cache.
+ try:
+ del self.sub_transport
+ except AttributeError:
+ pass
+
+ def get_lockable(self):
+ return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
+
+
+class TestLockableFiles_LockDir(TestCaseInTempDir,
+ _TestLockableFiles_mixin):
+ """LockableFile tests run with LockDir underneath"""
+
+ def setUp(self):
+ TestCaseInTempDir.setUp(self)
+ self.transport = transport.get_transport_from_path('.')
+ self.lockable = self.get_lockable()
+ # the lock creation here sets mode - test_permissions on branch
+ # tests that implicitly, but it might be a good idea to factor
+ # out the mode checking logic and have it applied to loackable files
+ # directly. RBC 20060418
+ self.lockable.create_lock()
+
+ def get_lockable(self):
+ return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
+
+ def test_lock_created(self):
+ self.assertTrue(self.transport.has('my-lock'))
+ self.lockable.lock_write()
+ self.assertTrue(self.transport.has('my-lock/held/info'))
+ self.lockable.unlock()
+ self.assertFalse(self.transport.has('my-lock/held/info'))
+ self.assertTrue(self.transport.has('my-lock'))
+
+ def test__file_modes(self):
+ self.transport.mkdir('readonly')
+ osutils.make_readonly('readonly')
+ lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
+ lockdir.LockDir)
+ # The directory mode should be read-write-execute for the current user
+ self.assertEqual(00700, lockable._dir_mode & 00700)
+ # Files should be read-write for the current user
+ self.assertEqual(00600, lockable._file_mode & 00700)
+
+
+class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
+ _TestLockableFiles_mixin):
+ """LockableFile tests run with RemoteLockDir on a branch."""
+
+ def setUp(self):
+ TestCaseWithSmartMedium.setUp(self)
+ # can only get a RemoteLockDir with some RemoteObject...
+ # use a branch as thats what we want. These mixin tests test the end
+ # to end behaviour, so stubbing out the backend and simulating would
+ # defeat the purpose. We test the protocol implementation separately
+ # in test_remote and test_smart as usual.
+ b = self.make_branch('foo')
+ self.addCleanup(b.bzrdir.transport.disconnect)
+ self.transport = transport.get_transport_from_path('.')
+ self.lockable = self.get_lockable()
+
+ def get_lockable(self):
+ # getting a new lockable involves opening a new instance of the branch
+ branch = bzrlib.branch.Branch.open(self.get_url('foo'))
+ self.addCleanup(branch.bzrdir.transport.disconnect)
+ return branch.control_files