diff options
author | Andrew Bartlett <abartlet@samba.org> | 2022-12-02 15:30:05 +1300 |
---|---|---|
committer | Jule Anger <janger@samba.org> | 2023-02-03 09:35:08 +0000 |
commit | fcc25f6baf8be5b0c9171a7f4e3ac87f66532c67 (patch) | |
tree | f482a91e07817378d3a4586d12619252780c9b06 | |
parent | b0bbea3fdcdbca0629cbb2f12b5378b8ba7ac423 (diff) | |
download | samba-fcc25f6baf8be5b0c9171a7f4e3ac87f66532c67.tar.gz |
s4-selftest/drs: Confirm GetNCChanges REPL_OBJ works with a DummyDN and real GUID
BUG: https://bugzilla.samba.org/show_bug.cgi?id=10635
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
(cherry picked from commit 70faccae6d595056174af8d63b3437c9fe3805aa)
-rw-r--r-- | selftest/knownfail.d/getncchanges | 3 | ||||
-rw-r--r-- | source4/torture/drs/python/getnc_exop.py | 80 |
2 files changed, 77 insertions, 6 deletions
diff --git a/selftest/knownfail.d/getncchanges b/selftest/knownfail.d/getncchanges index b716ff83797..cb6c8c630e3 100644 --- a/selftest/knownfail.d/getncchanges +++ b/selftest/knownfail.d/getncchanges @@ -9,3 +9,6 @@ samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegri ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidDestDSA_and_GUID_RID_ALLOC +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_DummyDN_valid_GUID_REPL_OBJ +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_DummyDN_valid_GUID_REPL_SECRET diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py index 885156cc6e2..9c16ecca323 100644 --- a/source4/torture/drs/python/getnc_exop.py +++ b/source4/torture/drs/python/getnc_exop.py @@ -241,8 +241,8 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): (enum, estr) = e1.args self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC) - def test_InvalidNC_DummyDN_InvalidGUID(self): - """Test full replication on a totally invalid GUID fails with the right error code""" + def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self): + """Test single object replication on a totally invalid GUID fails with the right error code""" fsmo_dn = self.ldb_dc1.get_schema_basedn() (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) @@ -250,16 +250,16 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): invocation_id=fsmo_owner["invocation_id"], nc_dn_str="DummyDN", nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), - exop=drsuapi.DRSUAPI_EXOP_NONE) + exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ) (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1) try: (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) except WERRORError as e1: (enum, estr) = e1.args - self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC) + self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN) - def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self): + def test_InvalidNC_DummyDN_InvalidGUID_REPL_SECRET(self): """Test single object replication on a totally invalid GUID fails with the right error code""" fsmo_dn = self.ldb_dc1.get_schema_basedn() (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) @@ -295,6 +295,52 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): (enum, estr) = e1.args self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC) + def test_DummyDN_valid_GUID_REPL_OBJ(self): + dc_guid_1 = self.ldb_dc1.get_invocation_id() + drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1) + + res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE, + attrs=["objectGUID"]) + + guid = misc.GUID(res[0]["objectGUID"][0]) + + req8 = self._exop_req8(dest_dsa=None, + invocation_id=dc_guid_1, + nc_dn_str="DummyDN", + nc_guid=guid, + exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ) + + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.fail(f"Failed to call GetNCChanges with EXOP_REPL_OBJ and a GUID: {estr}") + + self.assertEqual(ctr.first_object.object.identifier.guid, guid) + + def test_DummyDN_valid_GUID_REPL_SECRET(self): + dc_guid_1 = self.ldb_dc1.get_invocation_id() + drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1) + + res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE, + attrs=["objectGUID"]) + + guid = misc.GUID(res[0]["objectGUID"][0]) + + req8 = self._exop_req8(dest_dsa=None, + invocation_id=dc_guid_1, + nc_dn_str="DummyDN", + nc_guid=guid, + exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET) + + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + + # We expect to get as far as failing on the missing dest_dsa + self.assertEqual(enum, werror.WERR_DS_DRA_DB_ERROR) + def test_link_utdv_hwm(self): """Test verify the DRS_GET_ANC behavior.""" @@ -668,7 +714,29 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) except WERRORError as e1: (enum, estr) = e1.args - self.fail("DsGetNCChanges failed with {estr}") + self.fail(f"DsGetNCChanges failed with {estr}") + self.assertEqual(level, 6, "Expected level 6 response!") + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + + def test_InvalidDestDSA_and_GUID_RID_ALLOC(self): + """Test role transfer with invalid destination DSA guid""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str="DummyDN", + nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.fail(f"DsGetNCChanges failed with {estr}") self.assertEqual(level, 6, "Expected level 6 response!") self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) |