summaryrefslogtreecommitdiff
path: root/source4/torture/drs/python/ridalloc_exop.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/torture/drs/python/ridalloc_exop.py')
-rw-r--r--source4/torture/drs/python/ridalloc_exop.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/source4/torture/drs/python/ridalloc_exop.py b/source4/torture/drs/python/ridalloc_exop.py
index 7869967cf1d..8ce4098bc06 100644
--- a/source4/torture/drs/python/ridalloc_exop.py
+++ b/source4/torture/drs/python/ridalloc_exop.py
@@ -43,6 +43,7 @@ from samba.auth import system_session, admin_session
from samba.dbchecker import dbcheck
from samba.ndr import ndr_pack
from samba.dcerpc import security
+from samba import drs_utils, dsdb
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
@@ -673,3 +674,137 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
finally:
self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
shutil.rmtree(targetdir, ignore_errors=True)
+
+ def test_replicate_against_deleted_objects_transaction(self):
+ """Not related to RID allocation, but uses the infrastructure here.
+ Do a join, create a link between two objects remotely, but
+ remove the target locally. Show that we need to set a magic
+ opaque if there is an outer transaction.
+
+ """
+ fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ test_user4 = "ridalloctestuser4"
+ test_group = "ridalloctestgroup1"
+
+ self.ldb_dc1.newuser(test_user4, "P@ssword!")
+
+ self.addCleanup(self.ldb_dc1.deleteuser, test_user4)
+
+ self.ldb_dc1.newgroup(test_group)
+ self.addCleanup(self.ldb_dc1.deletegroup, test_group)
+
+ targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST8")
+ try:
+ # Connect to the database
+ ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+ lp = self.get_loadparm()
+
+ new_ldb = SamDB(ldb_url,
+ session_info=system_session(lp), lp=lp)
+
+ destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID())
+
+ repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]',
+ lp,
+ self.get_credentials(),
+ new_ldb,
+ destination_dsa_guid)
+
+ source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id)
+
+ # Add the link on the remote DC
+ self.ldb_dc1.add_remove_group_members(test_group, [test_user4])
+
+ # Starting a transaction overrides, currently the logic
+ # inside repl.replicatate to retry with GET_TGT which in
+ # turn tells the repl_meta_data module that the most up to
+ # date info is already available
+ new_ldb.transaction_start()
+ repl.replicate(self.ldb_dc1.domain_dn(),
+ source_dsa_invocation_id,
+ destination_dsa_guid)
+
+ # Delete the user locally, before applying the links.
+ # This simulates getting the delete in the replciation
+ # stream.
+ new_ldb.deleteuser(test_user4)
+
+ # This fails as the user has been deleted locally but a remote link is sent
+ self.assertRaises(ldb.LdbError, new_ldb.transaction_commit)
+
+ new_ldb.transaction_start()
+ repl.replicate(self.ldb_dc1.domain_dn(),
+ source_dsa_invocation_id,
+ destination_dsa_guid)
+
+ # Delete the user locally (the previous transaction
+ # doesn't apply), before applying the links. This
+ # simulates getting the delete in the replciation stream.
+ new_ldb.deleteuser(test_user4)
+
+ new_ldb.set_opaque_integer(dsdb.DSDB_FULL_JOIN_REPLICATION_COMPLETED_OPAQUE_NAME,
+ 1)
+
+ # This should now work
+ try:
+ new_ldb.transaction_commit()
+ except ldb.LdbError as e:
+ self.fail(f"Failed to replicate despite setting opaque with {e.args[1]}")
+
+ finally:
+ self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST8")
+ shutil.rmtree(targetdir, ignore_errors=True)
+
+ def test_replicate_against_deleted_objects_normal(self):
+ """Not related to RID allocation, but uses the infrastructure here.
+ Do a join, create a link between two objects remotely, but
+ remove the target locally. .
+
+ """
+ fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ test_user5 = "ridalloctestuser5"
+ test_group2 = "ridalloctestgroup2"
+
+ self.ldb_dc1.newuser(test_user5, "P@ssword!")
+ self.addCleanup(self.ldb_dc1.deleteuser, test_user5)
+
+ self.ldb_dc1.newgroup(test_group2)
+ self.addCleanup(self.ldb_dc1.deletegroup, test_group2)
+
+ targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST9")
+ try:
+ # Connect to the database
+ ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+ lp = self.get_loadparm()
+
+ new_ldb = SamDB(ldb_url,
+ session_info=system_session(lp), lp=lp)
+
+ destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID())
+
+ repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]',
+ lp,
+ self.get_credentials(),
+ new_ldb,
+ destination_dsa_guid)
+
+ source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id)
+
+ # Add the link on the remote DC
+ self.ldb_dc1.add_remove_group_members(test_group2, [test_user5])
+
+ # Delete the user locally
+ new_ldb.deleteuser(test_user5)
+
+ # Confirm replication copes with a link to a locally deleted user
+ repl.replicate(self.ldb_dc1.domain_dn(),
+ source_dsa_invocation_id,
+ destination_dsa_guid)
+
+ finally:
+ self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST9")
+ shutil.rmtree(targetdir, ignore_errors=True)