diff options
author | Andrew Bartlett <abartlet@samba.org> | 2022-12-02 10:07:53 +1300 |
---|---|---|
committer | Jule Anger <janger@samba.org> | 2023-02-03 09:35:08 +0000 |
commit | a81be07598363ce778482eeecd429c83278dd936 (patch) | |
tree | bd4e12c9c47bfc40937a853691a899265638ac20 | |
parent | 00d1f6223f2b13c46d061c58ab944f4459c4eed0 (diff) | |
download | samba-a81be07598363ce778482eeecd429c83278dd936.tar.gz |
s4-selftest/drs Add test of expected return code for invaid DNs in GetNCChanges
BUG: https://bugzilla.samba.org/show_bug.cgi?id=10635
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
(cherry picked from commit bee45e6b29b97e0cab19a9c3cf692d9a7585a717)
-rw-r--r-- | selftest/knownfail.d/getncchanges | 5 | ||||
-rw-r--r-- | source4/torture/drs/python/drs_base.py | 4 | ||||
-rw-r--r-- | source4/torture/drs/python/getnc_exop.py | 82 |
3 files changed, 88 insertions, 3 deletions
diff --git a/selftest/knownfail.d/getncchanges b/selftest/knownfail.d/getncchanges index 5ef1bc98bef..b716ff83797 100644 --- a/selftest/knownfail.d/getncchanges +++ b/selftest/knownfail.d/getncchanges @@ -4,3 +4,8 @@ samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegri samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_chain\(promoted_dc\) samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_and_anc\(promoted_dc\) samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_multivalued_links\(promoted_dc\) +# New tests for GetNCChanges with a GUID and a bad DN, like Azure AD Cloud Sync +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidDestDSA_and_GUID +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ +^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py index c5f7682d563..d87918456f1 100644 --- a/source4/torture/drs/python/drs_base.py +++ b/source4/torture/drs/python/drs_base.py @@ -432,13 +432,15 @@ class DrsBaseTestCase(SambaToolCmdTest): def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop, replica_flags=0, max_objects=0, partial_attribute_set=None, - partial_attribute_set_ex=None, mapping_ctr=None): + partial_attribute_set_ex=None, mapping_ctr=None, nc_guid=None): req8 = drsuapi.DsGetNCChangesRequest8() req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID() req8.source_dsa_invocation_id = misc.GUID(invocation_id) req8.naming_context = drsuapi.DsReplicaObjectIdentifier() req8.naming_context.dn = str(nc_dn_str) + if nc_guid is not None: + req8.naming_context.guid = nc_guid req8.highwatermark = drsuapi.DsReplicaHighWaterMark() req8.highwatermark.tmp_highest_usn = 0 req8.highwatermark.reserved_usn = 0 diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py index 446c7821d54..8582eb17c66 100644 --- a/source4/torture/drs/python/getnc_exop.py +++ b/source4/torture/drs/python/getnc_exop.py @@ -240,6 +240,60 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): (enum, estr) = e1.args self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC) + def test_InvalidNC_DummyDN_InvalidGUID(self): + """Test full replication on a totally invalid GUID fails with the right error code""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str="DummyDN", + nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), + exop=drsuapi.DRSUAPI_EXOP_NONE) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC) + + def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self): + """Test single object replication on a totally invalid GUID fails with the right error code""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str="DummyDN", + nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), + exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN) + + def test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC(self): + """Test RID Allocation on a totally invalid GUID fails with the right error code""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str="DummyDN", + nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1) + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC) + def test_link_utdv_hwm(self): """Test verify the DRS_GET_ANC behavior.""" @@ -597,12 +651,35 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + def test_InvalidDestDSA_and_GUID(self): + """Test role transfer with invalid destination DSA guid""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str="DummyDN", + nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"), + exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + try: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + except WERRORError as e1: + (enum, estr) = e1.args + self.fail("DsGetNCChanges failed with {estr}") + self.assertEqual(level, 6, "Expected level 6 response!") + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase): def setUp(self): super(DrsReplicaPrefixMapTestCase, self).setUp() self.base_dn = self.ldb_dc1.get_default_basedn() - self.ou = "ou=pfm_exop,%s" % self.base_dn + self.ou = "ou=pfm_exop%d,%s" % (random.randint(0, 4294967295), + self.base_dn) self.ldb_dc1.add({ "dn": self.ou, "objectclass": "organizationalUnit"}) @@ -948,7 +1025,8 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase): def setUp(self): super(DrsReplicaSyncSortTestCase, self).setUp() self.base_dn = self.ldb_dc1.get_default_basedn() - self.ou = "ou=sort_exop,%s" % self.base_dn + self.ou = "ou=sort_exop%d,%s" % (random.randint(0, 4294967295), + self.base_dn) self.ldb_dc1.add({ "dn": self.ou, "objectclass": "organizationalUnit"}) |