summaryrefslogtreecommitdiff
path: root/Lib/test/test_smtplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_smtplib.py')
-rw-r--r--Lib/test/test_smtplib.py343
1 files changed, 310 insertions, 33 deletions
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 95a9dbe912..8e36241428 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,5 +1,7 @@
import asyncore
import email.mime.text
+from email.message import EmailMessage
+from email.base64mime import body_encode as encode_base64
import email.utils
import socket
import smtpd
@@ -10,6 +12,7 @@ import sys
import time
import select
import errno
+import textwrap
import unittest
from test import support, mock_socket
@@ -30,7 +33,7 @@ if sys.platform == 'darwin':
def server(evt, buf, serv):
- serv.listen(5)
+ serv.listen()
evt.set()
try:
conn, addr = serv.accept()
@@ -123,6 +126,27 @@ class GeneralTests(unittest.TestCase):
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.close()
+ def test_debuglevel(self):
+ mock_socket.reply_with(b"220 Hello world")
+ smtp = smtplib.SMTP()
+ smtp.set_debuglevel(1)
+ with support.captured_stderr() as stderr:
+ smtp.connect(HOST, self.port)
+ smtp.close()
+ expected = re.compile(r"^connect:", re.MULTILINE)
+ self.assertRegex(stderr.getvalue(), expected)
+
+ def test_debuglevel_2(self):
+ mock_socket.reply_with(b"220 Hello world")
+ smtp = smtplib.SMTP()
+ smtp.set_debuglevel(2)
+ with support.captured_stderr() as stderr:
+ smtp.connect(HOST, self.port)
+ smtp.close()
+ expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
+ re.MULTILINE)
+ self.assertRegex(stderr.getvalue(), expected)
+
# Test server thread using the specified SMTP server class
def debugging_server(serv, serv_evt, client_evt):
@@ -184,7 +208,8 @@ class DebuggingServerTests(unittest.TestCase):
self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
smtpd.DEBUGSTREAM = io.StringIO()
# Pick a random unused port by passing 0 for the port number
- self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
+ self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
+ decode_data=True)
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
@@ -604,7 +629,8 @@ sim_auth_credentials = {
'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
}
-sim_auth_login_password = 'C29TZXBHC3N3B3JK'
+sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
+sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
'list-2':['Ms.B@xn--fo-fka.com',],
@@ -658,18 +684,16 @@ class SimSMTPChannel(smtpd.SMTPChannel):
self.push('550 No access for you!')
def smtp_AUTH(self, arg):
- if arg.strip().lower()=='cram-md5':
+ mech = arg.strip().lower()
+ if mech=='cram-md5':
self.push('334 {}'.format(sim_cram_md5_challenge))
- return
- mech, auth = arg.split()
- mech = mech.lower()
- if mech not in sim_auth_credentials:
+ elif mech not in sim_auth_credentials:
self.push('504 auth type unimplemented')
return
- if mech == 'plain' and auth==sim_auth_credentials['plain']:
- self.push('235 plain auth ok')
- elif mech=='login' and auth==sim_auth_credentials['login']:
- self.push('334 Password:')
+ elif mech=='plain':
+ self.push('334 ')
+ elif mech=='login':
+ self.push('334 ')
else:
self.push('550 No access for you!')
@@ -719,7 +743,8 @@ class SimSMTPServer(smtpd.SMTPServer):
def handle_accepted(self, conn, addr):
self._SMTPchannel = self.channel_class(
- self._extra_features, self, conn, addr)
+ self._extra_features, self, conn, addr,
+ decode_data=self._decode_data)
def process_message(self, peer, mailfrom, rcpttos, data):
pass
@@ -742,7 +767,7 @@ class SMTPSimTests(unittest.TestCase):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
# Pick a random unused port by passing 0 for the port number
- self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
+ self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
@@ -790,11 +815,11 @@ class SMTPSimTests(unittest.TestCase):
def testVRFY(self):
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- for email, name in sim_users.items():
+ for addr_spec, name in sim_users.items():
expected_known = (250, bytes('%s %s' %
- (name, smtplib.quoteaddr(email)),
+ (name, smtplib.quoteaddr(addr_spec)),
"ascii"))
- self.assertEqual(smtp.vrfy(email), expected_known)
+ self.assertEqual(smtp.vrfy(addr_spec), expected_known)
u = 'nobody@nowhere.com'
expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
@@ -816,28 +841,28 @@ class SMTPSimTests(unittest.TestCase):
self.assertEqual(smtp.expn(u), expected_unknown)
smtp.quit()
- def testAUTH_PLAIN(self):
- self.serv.add_feature("AUTH PLAIN")
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
- expected_auth_ok = (235, b'plain auth ok')
- self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)
- smtp.close()
-
- # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
- # require a synchronous read to obtain the credentials...so instead smtpd
+ # SimSMTPChannel doesn't fully support AUTH because it requires a
+ # synchronous read to obtain the credentials...so instead smtpd
# sees the credential sent by smtplib's login method as an unknown command,
# which results in smtplib raising an auth error. Fortunately the error
# message contains the encoded credential, so we can partially check that it
# was generated correctly (partially, because the 'word' is uppercased in
# the error message).
+ def testAUTH_PLAIN(self):
+ self.serv.add_feature("AUTH PLAIN")
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
+ try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
+ except smtplib.SMTPAuthenticationError as err:
+ self.assertIn(sim_auth_plain, str(err))
+ smtp.close()
+
def testAUTH_LOGIN(self):
self.serv.add_feature("AUTH LOGIN")
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
try: smtp.login(sim_auth[0], sim_auth[1])
except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_password, str(err))
+ self.assertIn(sim_auth_login_user, str(err))
smtp.close()
def testAUTH_CRAM_MD5(self):
@@ -855,7 +880,23 @@ class SMTPSimTests(unittest.TestCase):
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
try: smtp.login(sim_auth[0], sim_auth[1])
except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_password, str(err))
+ self.assertIn(sim_auth_login_user, str(err))
+ smtp.close()
+
+ def test_auth_function(self):
+ smtp = smtplib.SMTP(HOST, self.port,
+ local_hostname='localhost', timeout=15)
+ self.serv.add_feature("AUTH CRAM-MD5")
+ smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+ supported = {'CRAM-MD5': smtp.auth_cram_md5,
+ 'PLAIN': smtp.auth_plain,
+ 'LOGIN': smtp.auth_login,
+ }
+ for mechanism, method in supported.items():
+ try: smtp.auth(mechanism, method, initial_response_ok=False)
+ except smtplib.SMTPAuthenticationError as err:
+ self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
+ str(err))
smtp.close()
def test_quit_resets_greeting(self):
@@ -938,13 +979,249 @@ class SMTPSimTests(unittest.TestCase):
self.assertIsNone(smtp.sock)
self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
+ def test_smtputf8_NotSupportedError_if_no_server_support(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertTrue(smtp.does_esmtp)
+ self.assertFalse(smtp.has_extn('smtputf8'))
+ self.assertRaises(
+ smtplib.SMTPNotSupportedError,
+ smtp.sendmail,
+ 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
+ self.assertRaises(
+ smtplib.SMTPNotSupportedError,
+ smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
+
+ def test_send_unicode_without_SMTPUTF8(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
+ self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
+
+
+class SimSMTPUTF8Server(SimSMTPServer):
+
+ def __init__(self, *args, **kw):
+ # The base SMTP server turns these on automatically, but our test
+ # server is set up to munge the EHLO response, so we need to provide
+ # them as well. And yes, the call is to SMTPServer not SimSMTPServer.
+ self._extra_features = ['SMTPUTF8', '8BITMIME']
+ smtpd.SMTPServer.__init__(self, *args, **kw)
+
+ def handle_accepted(self, conn, addr):
+ self._SMTPchannel = self.channel_class(
+ self._extra_features, self, conn, addr,
+ decode_data=self._decode_data,
+ enable_SMTPUTF8=self.enable_SMTPUTF8,
+ )
+
+ def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
+ rcpt_options=None):
+ self.last_peer = peer
+ self.last_mailfrom = mailfrom
+ self.last_rcpttos = rcpttos
+ self.last_message = data
+ self.last_mail_options = mail_options
+ self.last_rcpt_options = rcpt_options
+
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SMTPUTF8SimTests(unittest.TestCase):
+
+ maxDiff = None
+
+ def setUp(self):
+ self.real_getfqdn = socket.getfqdn
+ socket.getfqdn = mock_socket.getfqdn
+ self.serv_evt = threading.Event()
+ self.client_evt = threading.Event()
+ # Pick a random unused port by passing 0 for the port number
+ self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
+ decode_data=False,
+ enable_SMTPUTF8=True)
+ # Keep a note of what port was assigned
+ self.port = self.serv.socket.getsockname()[1]
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
+ self.thread = threading.Thread(target=debugging_server, args=serv_args)
+ self.thread.start()
+
+ # wait until server thread has assigned a port number
+ self.serv_evt.wait()
+ self.serv_evt.clear()
+
+ def tearDown(self):
+ socket.getfqdn = self.real_getfqdn
+ # indicate that the client is finished
+ self.client_evt.set()
+ # wait for the server thread to terminate
+ self.serv_evt.wait()
+ self.thread.join()
+
+ def test_test_server_supports_extensions(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertTrue(smtp.does_esmtp)
+ self.assertTrue(smtp.has_extn('smtputf8'))
+
+ def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
+ m = '¡a test message containing unicode!'.encode('utf-8')
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.sendmail('Jőhn', 'Sálly', m,
+ mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
+ self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
+ self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
+ self.assertEqual(self.serv.last_message, m)
+ self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
+ self.assertIn('SMTPUTF8', self.serv.last_mail_options)
+ self.assertEqual(self.serv.last_rcpt_options, [])
+
+ def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
+ m = '¡a test message containing unicode!'.encode('utf-8')
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertEqual(
+ smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
+ (250, b'OK'))
+ self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
+ self.assertEqual(smtp.data(m), (250, b'OK'))
+ self.assertEqual(self.serv.last_mailfrom, 'Jő')
+ self.assertEqual(self.serv.last_rcpttos, ['János'])
+ self.assertEqual(self.serv.last_message, m)
+ self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
+ self.assertIn('SMTPUTF8', self.serv.last_mail_options)
+ self.assertEqual(self.serv.last_rcpt_options, [])
+
+ def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
+ msg = EmailMessage()
+ msg['From'] = "Páolo <főo@bar.com>"
+ msg['To'] = 'Dinsdale'
+ msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
+ # XXX I don't know why I need two \n's here, but this is an existing
+ # bug (if it is one) and not a problem with the new functionality.
+ msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
+ # XXX smtpd converts received /r/n to /n, so we can't easily test that
+ # we are successfully sending /r/n :(.
+ expected = textwrap.dedent("""\
+ From: Páolo <főo@bar.com>
+ To: Dinsdale
+ Subject: Nudge nudge, wink, wink \u1F609
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+ MIME-Version: 1.0
+
+ oh là là, know what I mean, know what I mean?
+ """)
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ self.assertEqual(smtp.send_message(msg), {})
+ self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
+ self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
+ self.assertEqual(self.serv.last_message.decode(), expected)
+ self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
+ self.assertIn('SMTPUTF8', self.serv.last_mail_options)
+ self.assertEqual(self.serv.last_rcpt_options, [])
+
+ def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
+ msg = EmailMessage()
+ msg['From'] = "Páolo <főo@bar.com>"
+ msg['To'] = 'Dinsdale'
+ msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ self.assertRaises(smtplib.SMTPNotSupportedError,
+ smtp.send_message(msg))
+
+
+EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
+
+class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
+ def smtp_AUTH(self, arg):
+ # RFC 4954's AUTH command allows for an optional initial-response.
+ # Not all AUTH methods support this; some require a challenge. AUTH
+ # PLAIN does those, so test that here. See issue #15014.
+ args = arg.split()
+ if args[0].lower() == 'plain':
+ if len(args) == 2:
+ # AUTH PLAIN <initial-response> with the response base 64
+ # encoded. Hard code the expected response for the test.
+ if args[1] == EXPECTED_RESPONSE:
+ self.push('235 Ok')
+ return
+ self.push('571 Bad authentication')
+
+class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
+ channel_class = SimSMTPAUTHInitialResponseChannel
+
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
+ def setUp(self):
+ self.real_getfqdn = socket.getfqdn
+ socket.getfqdn = mock_socket.getfqdn
+ self.serv_evt = threading.Event()
+ self.client_evt = threading.Event()
+ # Pick a random unused port by passing 0 for the port number
+ self.serv = SimSMTPAUTHInitialResponseServer(
+ (HOST, 0), ('nowhere', -1), decode_data=True)
+ # Keep a note of what port was assigned
+ self.port = self.serv.socket.getsockname()[1]
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
+ self.thread = threading.Thread(target=debugging_server, args=serv_args)
+ self.thread.start()
+
+ # wait until server thread has assigned a port number
+ self.serv_evt.wait()
+ self.serv_evt.clear()
+
+ def tearDown(self):
+ socket.getfqdn = self.real_getfqdn
+ # indicate that the client is finished
+ self.client_evt.set()
+ # wait for the server thread to terminate
+ self.serv_evt.wait()
+ self.thread.join()
+
+ def testAUTH_PLAIN_initial_response_login(self):
+ self.serv.add_feature('AUTH PLAIN')
+ smtp = smtplib.SMTP(HOST, self.port,
+ local_hostname='localhost', timeout=15)
+ smtp.login('psu', 'doesnotexist')
+ smtp.close()
+
+ def testAUTH_PLAIN_initial_response_auth(self):
+ self.serv.add_feature('AUTH PLAIN')
+ smtp = smtplib.SMTP(HOST, self.port,
+ local_hostname='localhost', timeout=15)
+ smtp.user = 'psu'
+ smtp.password = 'doesnotexist'
+ code, response = smtp.auth('plain', smtp.auth_plain)
+ smtp.close()
+ self.assertEqual(code, 235)
+
@support.reap_threads
def test_main(verbose=None):
- support.run_unittest(GeneralTests, DebuggingServerTests,
- NonConnectingTests,
- BadHELOServerTests, SMTPSimTests,
- TooLongLineTests)
+ support.run_unittest(
+ BadHELOServerTests,
+ DebuggingServerTests,
+ GeneralTests,
+ NonConnectingTests,
+ SMTPAUTHInitialResponseSimTests,
+ SMTPSimTests,
+ TooLongLineTests,
+ )
+
if __name__ == '__main__':
test_main()