summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Bartlett <abartlet@samba.org>2022-12-02 10:07:53 +1300
committerJule Anger <janger@samba.org>2023-02-03 09:35:08 +0000
commita81be07598363ce778482eeecd429c83278dd936 (patch)
treebd4e12c9c47bfc40937a853691a899265638ac20
parent00d1f6223f2b13c46d061c58ab944f4459c4eed0 (diff)
downloadsamba-a81be07598363ce778482eeecd429c83278dd936.tar.gz
s4-selftest/drs Add test of expected return code for invaid DNs in GetNCChanges
BUG: https://bugzilla.samba.org/show_bug.cgi?id=10635 Signed-off-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Stefan Metzmacher <metze@samba.org> (cherry picked from commit bee45e6b29b97e0cab19a9c3cf692d9a7585a717)
-rw-r--r--selftest/knownfail.d/getncchanges5
-rw-r--r--source4/torture/drs/python/drs_base.py4
-rw-r--r--source4/torture/drs/python/getnc_exop.py82
3 files changed, 88 insertions, 3 deletions
diff --git a/selftest/knownfail.d/getncchanges b/selftest/knownfail.d/getncchanges
index 5ef1bc98bef..b716ff83797 100644
--- a/selftest/knownfail.d/getncchanges
+++ b/selftest/knownfail.d/getncchanges
@@ -4,3 +4,8 @@ samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegri
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_chain\(promoted_dc\)
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_and_anc\(promoted_dc\)
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_multivalued_links\(promoted_dc\)
+# New tests for GetNCChanges with a GUID and a bad DN, like Azure AD Cloud Sync
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidDestDSA_and_GUID
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC
diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index c5f7682d563..d87918456f1 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -432,13 +432,15 @@ class DrsBaseTestCase(SambaToolCmdTest):
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
replica_flags=0, max_objects=0, partial_attribute_set=None,
- partial_attribute_set_ex=None, mapping_ctr=None):
+ partial_attribute_set_ex=None, mapping_ctr=None, nc_guid=None):
req8 = drsuapi.DsGetNCChangesRequest8()
req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.source_dsa_invocation_id = misc.GUID(invocation_id)
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
req8.naming_context.dn = str(nc_dn_str)
+ if nc_guid is not None:
+ req8.naming_context.guid = nc_guid
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
req8.highwatermark.tmp_highest_usn = 0
req8.highwatermark.reserved_usn = 0
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index 446c7821d54..8582eb17c66 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -240,6 +240,60 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+ def test_InvalidNC_DummyDN_InvalidGUID(self):
+ """Test full replication on a totally invalid GUID fails with the right error code"""
+ fsmo_dn = self.ldb_dc1.get_schema_basedn()
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_NONE)
+
+ (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+
+ def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
+ """Test single object replication on a totally invalid GUID fails with the right error code"""
+ fsmo_dn = self.ldb_dc1.get_schema_basedn()
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
+
+ (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN)
+
+ def test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC(self):
+ """Test RID Allocation on a totally invalid GUID fails with the right error code"""
+ fsmo_dn = self.ldb_dc1.get_schema_basedn()
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+ (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+
def test_link_utdv_hwm(self):
"""Test verify the DRS_GET_ANC behavior."""
@@ -597,12 +651,35 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+ def test_InvalidDestDSA_and_GUID(self):
+ """Test role transfer with invalid destination DSA guid"""
+ fsmo_dn = self.ldb_dc1.get_schema_basedn()
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
+
+ (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.fail("DsGetNCChanges failed with {estr}")
+ self.assertEqual(level, 6, "Expected level 6 response!")
+ self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+ self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+ self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaPrefixMapTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
- self.ou = "ou=pfm_exop,%s" % self.base_dn
+ self.ou = "ou=pfm_exop%d,%s" % (random.randint(0, 4294967295),
+ self.base_dn)
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})
@@ -948,7 +1025,8 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncSortTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
- self.ou = "ou=sort_exop,%s" % self.base_dn
+ self.ou = "ou=sort_exop%d,%s" % (random.randint(0, 4294967295),
+ self.base_dn)
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})