summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Sutton <josephsutton@catalyst.net.nz>2022-05-24 19:57:57 +1200
committerJule Anger <janger@samba.org>2022-07-24 11:42:01 +0200
commit668825ad56ff70715c626bc3209a6868409e4969 (patch)
tree98ed00e05eaae820728c8b474c34b17a8546ce6d
parent5c41e20fae268e04aa05e821c7f388ea090727af (diff)
downloadsamba-668825ad56ff70715c626bc3209a6868409e4969.tar.gz
CVE-2022-2031 tests/krb5: Add kpasswd_exchange() method
Now we can test the kpasswd service from Python. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074 Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz> Reviewed-by: Andreas Schneider <asn@samba.org> [jsutton@samba.org Fixed conflicts in imports]
-rw-r--r--python/samba/tests/krb5/raw_testcase.py264
1 files changed, 251 insertions, 13 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py
index 2aed5530455..57010ae73bd 100644
--- a/python/samba/tests/krb5/raw_testcase.py
+++ b/python/samba/tests/krb5/raw_testcase.py
@@ -26,6 +26,8 @@ import binascii
import itertools
import collections
+from enum import Enum
+
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
@@ -33,6 +35,8 @@ from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder
+from pyasn1.error import PyAsn1Error
+
from samba.credentials import Credentials
from samba.dcerpc import krb5pac, security
from samba.gensec import FEATURE_SEAL
@@ -50,6 +54,7 @@ from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_PREAUTH_FAILED,
KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
KERB_ERR_TYPE_EXTENDED,
+ KRB_AP_REP,
KRB_AP_REQ,
KRB_AS_REP,
KRB_AS_REQ,
@@ -59,6 +64,7 @@ from samba.tests.krb5.rfc4120_constants import (
KRB_TGS_REQ,
KU_AP_REQ_AUTH,
KU_AS_REP_ENC_PART,
+ KU_AP_REQ_ENC_PART,
KU_ENC_CHALLENGE_KDC,
KU_FAST_ENC,
KU_FAST_FINISHED,
@@ -73,6 +79,7 @@ from samba.tests.krb5.rfc4120_constants import (
KU_TGS_REQ_AUTH_DAT_SESSION,
KU_TGS_REQ_AUTH_DAT_SUBKEY,
KU_TICKET,
+ NT_PRINCIPAL,
NT_SRV_INST,
NT_WELLKNOWN,
PADATA_ENCRYPTED_CHALLENGE,
@@ -515,6 +522,10 @@ class KerberosTicketCreds:
class RawKerberosTest(TestCaseInTempDir):
"""A raw Kerberos Test case."""
+ class KpasswdMode(Enum):
+ SET = object()
+ CHANGE = object()
+
pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
krb5pac.PAC_TYPE_KDC_CHECKSUM,
krb5pac.PAC_TYPE_TICKET_CHECKSUM}
@@ -1886,6 +1897,224 @@ class RawKerberosTest(TestCaseInTempDir):
return msg
+ def get_enc_part(self, obj, key, usage):
+ self.assertElementEqual(obj, 'pvno', 5)
+
+ enc_part = obj['enc-part']
+ self.assertElementEqual(enc_part, 'etype', key.etype)
+ self.assertElementKVNO(enc_part, 'kvno', key.kvno)
+
+ enc_part = key.decrypt(usage, enc_part['cipher'])
+
+ return enc_part
+
+ def kpasswd_exchange(self,
+ ticket,
+ new_password,
+ expected_code,
+ expected_msg,
+ mode,
+ target_princ=None,
+ target_realm=None,
+ ap_options=None,
+ send_seq_number=True):
+ if mode is self.KpasswdMode.SET:
+ version = 0xff80
+ user_data = self.ChangePasswdDataMS_create(new_password,
+ target_princ,
+ target_realm)
+ elif mode is self.KpasswdMode.CHANGE:
+ self.assertIsNone(target_princ,
+ 'target_princ only valid for pw set')
+ self.assertIsNone(target_realm,
+ 'target_realm only valid for pw set')
+
+ version = 1
+ user_data = new_password.encode('utf-8')
+ else:
+ self.fail(f'invalid mode {mode}')
+
+ subkey = self.RandomKey(kcrypto.Enctype.AES256)
+
+ if ap_options is None:
+ ap_options = '0'
+ ap_options = str(krb5_asn1.APOptions(ap_options))
+
+ kdc_exchange_dict = {
+ 'tgt': ticket,
+ 'authenticator_subkey': subkey,
+ 'auth_data': None,
+ 'ap_options': ap_options,
+ }
+
+ if send_seq_number:
+ seq_number = random.randint(0, 0xfffffffe)
+ else:
+ seq_number = None
+
+ ap_req = self.generate_ap_req(kdc_exchange_dict,
+ None,
+ req_body=None,
+ armor=False,
+ usage=KU_AP_REQ_AUTH,
+ seq_number=seq_number)
+
+ self.connect(self.host, port=464)
+ self.assertIsNotNone(self.s)
+
+ family = self.s.family
+
+ if family == socket.AF_INET:
+ addr_type = 2 # IPv4
+ elif family == socket.AF_INET6:
+ addr_type = 24 # IPv6
+ else:
+ self.fail(f'unknown family {family}')
+
+ def create_address(ip):
+ return {
+ 'addr-type': addr_type,
+ 'address': socket.inet_pton(family, ip),
+ }
+
+ local_ip = self.s.getsockname()[0]
+ local_address = create_address(local_ip)
+
+ # remote_ip = self.s.getpeername()[0]
+ # remote_address = create_address(remote_ip)
+
+ # TODO: due to a bug (?), MIT Kerberos will not accept the request
+ # unless r-address is set to our _local_ address. Heimdal, on the other
+ # hand, requires the r-address is set to the remote address (as
+ # expected). To avoid problems, avoid sending r-address for now.
+ remote_address = None
+
+ msg = self.kpasswd_create(subkey,
+ user_data,
+ version,
+ seq_number,
+ ap_req,
+ local_address,
+ remote_address)
+
+ self.send_msg(msg)
+ rep_pdu = self.recv_pdu_raw()
+
+ self._disconnect('transaction done')
+
+ self.assertIsNotNone(rep_pdu)
+
+ header = rep_pdu[:6]
+ reply = rep_pdu[6:]
+
+ reply_len = (header[0] << 8) | header[1]
+ reply_version = (header[2] << 8) | header[3]
+ ap_rep_len = (header[4] << 8) | header[5]
+
+ self.assertEqual(reply_len, len(rep_pdu))
+ self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
+ self.assertLess(ap_rep_len, reply_len)
+
+ self.assertNotEqual(0x7e, rep_pdu[1])
+ self.assertNotEqual(0x5e, rep_pdu[1])
+
+ if ap_rep_len:
+ # We received an AP-REQ and KRB-PRIV as a response. This may or may
+ # not indicate an error, depending on the status code.
+ ap_rep = reply[:ap_rep_len]
+ krb_priv = reply[ap_rep_len:]
+
+ key = ticket.session_key
+
+ ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
+ self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
+ enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
+ enc_part = self.der_decode(
+ enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
+
+ self.assertElementPresent(enc_part, 'ctime')
+ self.assertElementPresent(enc_part, 'cusec')
+ # self.assertElementMissing(enc_part, 'subkey') # TODO
+ # self.assertElementPresent(enc_part, 'seq-number') # TODO
+
+ try:
+ krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
+ except PyAsn1Error:
+ self.fail()
+
+ self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
+ priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
+ priv_enc_part = self.der_decode(
+ priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
+
+ self.assertElementMissing(priv_enc_part, 'timestamp')
+ self.assertElementMissing(priv_enc_part, 'usec')
+ # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
+ # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
+ # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
+
+ result_data = priv_enc_part['user-data']
+ else:
+ # We received a KRB-ERROR as a response, indicating an error.
+ krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
+
+ sname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=['kadmin', 'changepw'])
+ realm = self.get_krbtgt_creds().get_realm().upper()
+
+ self.assertElementEqual(krb_error, 'pvno', 5)
+ self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
+ self.assertElementMissing(krb_error, 'ctime')
+ self.assertElementMissing(krb_error, 'usec')
+ self.assertElementPresent(krb_error, 'stime')
+ self.assertElementPresent(krb_error, 'susec')
+
+ error_code = krb_error['error-code']
+ if isinstance(expected_code, int):
+ self.assertEqual(error_code, expected_code)
+ else:
+ self.assertIn(error_code, expected_code)
+
+ self.assertElementMissing(krb_error, 'crealm')
+ self.assertElementMissing(krb_error, 'cname')
+ self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
+ self.assertElementEqualPrincipal(krb_error, 'sname', sname)
+ self.assertElementMissing(krb_error, 'e-text')
+
+ result_data = krb_error['e-data']
+
+ status = result_data[:2]
+ message = result_data[2:]
+
+ status_code = (status[0] << 8) | status[1]
+ if isinstance(expected_code, int):
+ self.assertEqual(status_code, expected_code)
+ else:
+ self.assertIn(status_code, expected_code)
+
+ if not message:
+ self.assertEqual(0, status_code,
+ 'got an error result, but no message')
+ return
+
+ # Check the first character of the message.
+ if message[0]:
+ if isinstance(expected_msg, bytes):
+ self.assertEqual(message, expected_msg)
+ else:
+ self.assertIn(message, expected_msg)
+ else:
+ # We got AD password policy information.
+ self.assertEqual(30, len(message))
+
+ (empty_bytes,
+ min_length,
+ history_length,
+ properties,
+ expire_time,
+ min_age) = struct.unpack('>HIIIQQ', message)
+
def _generic_kdc_exchange(self,
kdc_exchange_dict, # required
cname=None, # optional
@@ -1996,7 +2225,7 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertIsNotNone(generate_fast_fn)
fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
callback_dict,
- req_body,
+ None,
armor=True)
fast_armor_type = kdc_exchange_dict['fast_armor_type']
@@ -3211,31 +3440,39 @@ class RawKerberosTest(TestCaseInTempDir):
kdc_exchange_dict,
_callback_dict,
req_body,
- armor):
+ armor,
+ usage=None,
+ seq_number=None):
+ req_body_checksum = None
+
if armor:
+ self.assertIsNone(req_body)
+
tgt = kdc_exchange_dict['armor_tgt']
authenticator_subkey = kdc_exchange_dict['armor_subkey']
-
- req_body_checksum = None
else:
tgt = kdc_exchange_dict['tgt']
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
- body_checksum_type = kdc_exchange_dict['body_checksum_type']
- req_body_blob = self.der_encode(req_body,
- asn1Spec=krb5_asn1.KDC_REQ_BODY())
+ if req_body is not None:
+ body_checksum_type = kdc_exchange_dict['body_checksum_type']
+
+ req_body_blob = self.der_encode(
+ req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
- req_body_checksum = self.Checksum_create(tgt.session_key,
- KU_TGS_REQ_AUTH_CKSUM,
- req_body_blob,
- ctype=body_checksum_type)
+ req_body_checksum = self.Checksum_create(
+ tgt.session_key,
+ KU_TGS_REQ_AUTH_CKSUM,
+ req_body_blob,
+ ctype=body_checksum_type)
auth_data = kdc_exchange_dict['auth_data']
subkey_obj = None
if authenticator_subkey is not None:
subkey_obj = authenticator_subkey.export_obj()
- seq_number = random.randint(0, 0xfffffffe)
+ if seq_number is None:
+ seq_number = random.randint(0, 0xfffffffe)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
authenticator_obj = self.Authenticator_create(
crealm=tgt.crealm,
@@ -3250,7 +3487,8 @@ class RawKerberosTest(TestCaseInTempDir):
authenticator_obj,
asn1Spec=krb5_asn1.Authenticator())
- usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
+ if usage is None:
+ usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
authenticator = self.EncryptedData_create(tgt.session_key,
usage,
authenticator_blob)