summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Sutton <josephsutton@catalyst.net.nz>2022-05-24 19:20:28 +1200
committerJule Anger <janger@samba.org>2022-07-24 11:42:01 +0200
commit13fe7e013eccca2c86258084f4443ddb7abaf089 (patch)
tree4a4c63ecde41488413bd30cafdb55fa85d7e6312
parentae7dd875cd4362ed4346716db493164c421b889f (diff)
downloadsamba-13fe7e013eccca2c86258084f4443ddb7abaf089.tar.gz
CVE-2022-2031 tests/krb5: Add methods to send and receive generic messages
This allows us to send and receive kpasswd messages, while avoiding the existing logic for encoding and decoding other Kerberos message types. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074 Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz> Reviewed-by: Andreas Schneider <asn@samba.org>
-rw-r--r--python/samba/tests/krb5/raw_testcase.py44
1 files changed, 27 insertions, 17 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py
index 421143781ae..2aed5530455 100644
--- a/python/samba/tests/krb5/raw_testcase.py
+++ b/python/samba/tests/krb5/raw_testcase.py
@@ -920,24 +920,28 @@ class RawKerberosTest(TestCaseInTempDir):
return blob
def send_pdu(self, req, asn1_print=None, hexdump=None):
+ k5_pdu = self.der_encode(
+ req, native_decode=False, asn1_print=asn1_print, hexdump=False)
+ self.send_msg(k5_pdu, hexdump=hexdump)
+
+ def send_msg(self, msg, hexdump=None):
+ header = struct.pack('>I', len(msg))
+ req_pdu = header
+ req_pdu += msg
+ self.hex_dump("send_msg", header, hexdump=hexdump)
+ self.hex_dump("send_msg", msg, hexdump=hexdump)
+
try:
- k5_pdu = self.der_encode(
- req, native_decode=False, asn1_print=asn1_print, hexdump=False)
- header = struct.pack('>I', len(k5_pdu))
- req_pdu = header
- req_pdu += k5_pdu
- self.hex_dump("send_pdu", header, hexdump=hexdump)
- self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
while True:
sent = self.s.send(req_pdu, 0)
if sent == len(req_pdu):
- break
+ return
req_pdu = req_pdu[sent:]
except socket.error as e:
- self._disconnect("send_pdu: %s" % e)
+ self._disconnect("send_msg: %s" % e)
raise
except IOError as e:
- self._disconnect("send_pdu: %s" % e)
+ self._disconnect("send_msg: %s" % e)
raise
def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
@@ -963,16 +967,14 @@ class RawKerberosTest(TestCaseInTempDir):
return rep_pdu
def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
- rep_pdu = None
- rep = None
raw_pdu = self.recv_raw(
num_recv=4, hexdump=hexdump, timeout=timeout)
if raw_pdu is None:
- return (None, None)
+ return None
header = struct.unpack(">I", raw_pdu[0:4])
k5_len = header[0]
if k5_len == 0:
- return (None, "")
+ return ""
missing = k5_len
rep_pdu = b''
while missing > 0:
@@ -981,6 +983,14 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertGreaterEqual(len(raw_pdu), 1)
rep_pdu += raw_pdu
missing = k5_len - len(rep_pdu)
+ return rep_pdu
+
+ def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
+ rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
+ hexdump=hexdump,
+ timeout=timeout)
+ if not rep_pdu:
+ return None, rep_pdu
k5_raw = self.der_decode(
rep_pdu,
asn1Spec=None,
@@ -1002,9 +1012,9 @@ class RawKerberosTest(TestCaseInTempDir):
return (rep, rep_pdu)
def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
- (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
- hexdump=hexdump,
- timeout=timeout)
+ (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
+ hexdump=hexdump,
+ timeout=timeout)
return rep
def assertIsConnected(self):