diff options
Diffstat (limited to 'source4/torture/drs/python/ridalloc_exop.py')
-rw-r--r-- | source4/torture/drs/python/ridalloc_exop.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/source4/torture/drs/python/ridalloc_exop.py b/source4/torture/drs/python/ridalloc_exop.py index 7869967cf1d..8ce4098bc06 100644 --- a/source4/torture/drs/python/ridalloc_exop.py +++ b/source4/torture/drs/python/ridalloc_exop.py @@ -43,6 +43,7 @@ from samba.auth import system_session, admin_session from samba.dbchecker import dbcheck from samba.ndr import ndr_pack from samba.dcerpc import security +from samba import drs_utils, dsdb class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): @@ -673,3 +674,137 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): finally: self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7") shutil.rmtree(targetdir, ignore_errors=True) + + def test_replicate_against_deleted_objects_transaction(self): + """Not related to RID allocation, but uses the infrastructure here. + Do a join, create a link between two objects remotely, but + remove the target locally. Show that we need to set a magic + opaque if there is an outer transaction. + + """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + test_user4 = "ridalloctestuser4" + test_group = "ridalloctestgroup1" + + self.ldb_dc1.newuser(test_user4, "P@ssword!") + + self.addCleanup(self.ldb_dc1.deleteuser, test_user4) + + self.ldb_dc1.newgroup(test_group) + self.addCleanup(self.ldb_dc1.deletegroup, test_group) + + targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST8") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, + session_info=system_session(lp), lp=lp) + + destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID()) + + repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]', + lp, + self.get_credentials(), + new_ldb, + destination_dsa_guid) + + source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id) + + # Add the link on the remote DC + self.ldb_dc1.add_remove_group_members(test_group, [test_user4]) + + # Starting a transaction overrides, currently the logic + # inside repl.replicatate to retry with GET_TGT which in + # turn tells the repl_meta_data module that the most up to + # date info is already available + new_ldb.transaction_start() + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + # Delete the user locally, before applying the links. + # This simulates getting the delete in the replciation + # stream. + new_ldb.deleteuser(test_user4) + + # This fails as the user has been deleted locally but a remote link is sent + self.assertRaises(ldb.LdbError, new_ldb.transaction_commit) + + new_ldb.transaction_start() + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + # Delete the user locally (the previous transaction + # doesn't apply), before applying the links. This + # simulates getting the delete in the replciation stream. + new_ldb.deleteuser(test_user4) + + new_ldb.set_opaque_integer(dsdb.DSDB_FULL_JOIN_REPLICATION_COMPLETED_OPAQUE_NAME, + 1) + + # This should now work + try: + new_ldb.transaction_commit() + except ldb.LdbError as e: + self.fail(f"Failed to replicate despite setting opaque with {e.args[1]}") + + finally: + self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST8") + shutil.rmtree(targetdir, ignore_errors=True) + + def test_replicate_against_deleted_objects_normal(self): + """Not related to RID allocation, but uses the infrastructure here. + Do a join, create a link between two objects remotely, but + remove the target locally. . + + """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + test_user5 = "ridalloctestuser5" + test_group2 = "ridalloctestgroup2" + + self.ldb_dc1.newuser(test_user5, "P@ssword!") + self.addCleanup(self.ldb_dc1.deleteuser, test_user5) + + self.ldb_dc1.newgroup(test_group2) + self.addCleanup(self.ldb_dc1.deletegroup, test_group2) + + targetdir = self._test_join(self.dnsname_dc1, "RIDALLOCTEST9") + try: + # Connect to the database + ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb") + lp = self.get_loadparm() + + new_ldb = SamDB(ldb_url, + session_info=system_session(lp), lp=lp) + + destination_dsa_guid = misc.GUID(new_ldb.get_ntds_GUID()) + + repl = drs_utils.drs_Replicate(f'ncacn_ip_tcp:{self.dnsname_dc1}[seal]', + lp, + self.get_credentials(), + new_ldb, + destination_dsa_guid) + + source_dsa_invocation_id = misc.GUID(self.ldb_dc1.invocation_id) + + # Add the link on the remote DC + self.ldb_dc1.add_remove_group_members(test_group2, [test_user5]) + + # Delete the user locally + new_ldb.deleteuser(test_user5) + + # Confirm replication copes with a link to a locally deleted user + repl.replicate(self.ldb_dc1.domain_dn(), + source_dsa_invocation_id, + destination_dsa_guid) + + finally: + self._test_force_demote(self.dnsname_dc1, "RIDALLOCTEST9") + shutil.rmtree(targetdir, ignore_errors=True) |