summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEli Collins <elic@assurancetechnologies.com>2016-11-07 21:50:51 -0500
committerEli Collins <elic@assurancetechnologies.com>2016-11-07 21:50:51 -0500
commitea291cc3d8209ed298b1d73776684cef12fc55e6 (patch)
tree6981a4db27d5fe54afe2164a8250dd0f130687e2
parentc9f0d96ffb0c86013cfb5920fd8184ea40baf1ef (diff)
downloadpasslib-ea291cc3d8209ed298b1d73776684cef12fc55e6.tar.gz
totp: stripped out the 'stateful' methods (TOTP.advance, TOTP.consume, and TOTP.last_counter).
switching to purely stateless object, will update docs so that apps are instructed to persist verify()'s last_counter value independantly.
-rw-r--r--docs/lib/passlib.totp-tutorial.rst81
-rw-r--r--docs/lib/passlib.totp.rst11
-rw-r--r--passlib/tests/test_totp.py216
-rw-r--r--passlib/totp.py180
4 files changed, 31 insertions, 457 deletions
diff --git a/docs/lib/passlib.totp-tutorial.rst b/docs/lib/passlib.totp-tutorial.rst
index 90868e3..fff2696 100644
--- a/docs/lib/passlib.totp-tutorial.rst
+++ b/docs/lib/passlib.totp-tutorial.rst
@@ -16,18 +16,14 @@ using the widely supported TOTP specification.
This module is designed to support a variety of use cases, including:
- * Low-level methods for creating OTP keys.
+ * Creating & transferring TOTP keys to client devices.
- * Low-level methods for generating & verifying tokens.
+ * Generating & verifying tokens.
- * Methods for transferring OTP keys from server to client via provisioning URIs.
+ * Securely storing TOTP keys.
- * High level methods for encrypting & storing OTP keys and state.
-
- * High level methods for generating & verifying tokens with persistent state.
-
-This walkthrough starts with the simpler lowlevel cases, and builds up
-to the more complex highlevel ones.
+This walkthrough starts with the simpler cases, and builds up
+to the more complex ones.
.. seealso:: The :mod:`passlib.totp` api reference,
which lists all details of all the classes and methods mentioned here.
@@ -278,73 +274,6 @@ Use this to provide the last counter value that was authenticated:
For more details, see the :meth:`TOTP.verify` method.
-.. _totp-stateful-usage:
-
-Stateful Verification
-=====================
-TOTP has a number of edge cases that require applications to store some persistent state
-about a TOTP token. This includes things such as the last-used counter value
-(to prevent :ref:`token re-use <totp-reuse-warning>`), and verification history
-(for client clock drift estimation).
-
-To make this easier, passlib offers a higher level stateful API,
-which layers itself on top of the stateless api documented in `Basic Usage`_.
-For the server-side, this is oriented around the :meth:`TOTP.consume` method,
-which acts as a thin (stateful) wrapper around the :meth:`TOTP.verify` method.
-
-The :attr:`TOTP.last_counter` attribute can be used to track value of the last token which successfully
-verified. The :meth:`TOTP.consume` method uses this to detect and prevent
-re-use of the last verified token, and (upon successful verification)
-updates this attribute to indicate the provided token has been "consumed".
-
-For instance, the following is a typical successful verification of a token::
-
- >>> # load otp object from json-encoded value in database
- >>> from passlib import totp
- >>> data = '{"key":"GVDOQ7NP6XPJWE4CWCLFFSXZH6DTAZWM","type":"totp","v":1,"last_counter":49157801}'
- >>> otp = totp.from_json(data)
-
- >>> # observe that last counter set to when user last successfully used a token
- >>> otp.last_counter
- 49157801
-
- >>> # user provides valid token
- >>> otp.consume('359275')
-
- >>> # observe that last counter has been updated by consume()
- >>> otp.last_counter
- 49157961
-
- >>> # applications should re-persist to storage to prevent re-use of this token
- >>> data = otp.to_json()
-
-Now that the :attr:`!last_counter` value has been updated,
-if an attacker then comes along and attempts to re-use this token within :meth:`!verify`'s time window::
-
- >>> # load otp object from json-encoded value in database
- >>> from passlib import totp
- >>> data = '{"key":"GVDOQ7NP6XPJWE4CWCLFFSXZH6DTAZWM","type":"totp","v":1,"last_counter":49157961}'
- >>> otp = totp.from_json(data)
-
- >>> # observe that last counter set to when user last successfully used a token
- >>> otp.last_counter
- 49157961
-
- >>> # attacker tries to reuse token
- >>> otp.consume('359275')
- ...
- UsedTokenError: Token has already been used, please wait for another.
-
-.. seealso::
-
- The :meth:`TOTP.consume` and :meth:`TOTP.to_json` methods,
- the :attr:`TOTP.last_counter` attribute, and the :func:`~totp.from_json` constructor.
-
-..
- *Sidenote:* If implementing a TOTP client, the :meth:`TOTP.advance` method offers similar state
- tracking on the client side (though this is really only offered as a parallel
- to :meth:`HOTP.advance`, where client-side statefulness is needed).
-
.. _totp-context-usage:
OTPContext Usage
diff --git a/docs/lib/passlib.totp.rst b/docs/lib/passlib.totp.rst
index 6916779..d3cb4db 100644
--- a/docs/lib/passlib.totp.rst
+++ b/docs/lib/passlib.totp.rst
@@ -38,7 +38,6 @@ TOTP – Client-Side Token Generation
TOTP – Server-Side Token Verification
-------------------------------------
.. automethod:: TOTP.verify
-.. automethod:: TOTP.consume
.. todo::
@@ -103,16 +102,6 @@ Most of this information will be serialized by :meth:`~TOTP.to_uri` and :meth:`~
.. autoattribute:: TOTP.alg
.. autoattribute:: TOTP.period
-TOTP – Internal State Attributes
---------------------------------
-The following attributes are used to track the internal state of this generator,
-and will be included in the output of :meth:`~TOTP.to_json`:
-
-.. autoattribute:: TOTP.last_counter
-
-(Note: All internal state attributes can be initialized via constructor options,
-but this is mainly an internal / testing detail).
-
Support Classes
---------------
.. autoclass:: TotpToken()
diff --git a/passlib/tests/test_totp.py b/passlib/tests/test_totp.py
index b31356e..bf2a3e5 100644
--- a/passlib/tests/test_totp.py
+++ b/passlib/tests/test_totp.py
@@ -241,6 +241,8 @@ class OTPContextTest(TestCase):
# TODO: test from_uri(), from_json()
+ # TODO: test .changed when deserializing from outtdated tag / encryption parameters
+
#=============================================================================
# encrypt_key() & decrypt_key() helpers
#=============================================================================
@@ -558,10 +560,6 @@ class _BaseOTPTest(TestCase):
self.assertEqual(OTP(KEY1, issuer="foo.com").issuer, "foo.com")
self.assertRaises(ValueError, OTP, KEY1, issuer="foo.com:bar")
- # NOTE: 'changed' is internal parameter,
- # tested via .advance(), .consume(),
- # and to_json() / from_json()
-
#=============================================================================
# internal helpers
#=============================================================================
@@ -804,9 +802,6 @@ class TotpTest(_BaseOTPTest):
# require returns non-negative value
self.assertRaisesRegex(AssertionError, msg_re, self.randotp, now=lambda : -1)
- # NOTE: 'last_counter' is an internal parameter,
- # tested by from_json() / to_json().
-
#=============================================================================
# internal helpers
#=============================================================================
@@ -916,87 +911,17 @@ class TotpTest(_BaseOTPTest):
# reject invalid time
self.assertRaises(ValueError, otp.generate, -1)
- def test_generate_w_reference_vectors(self, for_advance=False):
+ def test_generate_w_reference_vectors(self):
"""generate() -- reference vectors"""
for otp, time, token, expires, prefix in self.iter_test_vectors():
# should output correct token for specified time
- if for_advance:
- otp.now = lambda: time
- result = otp.advance()
- else:
- result = otp.generate(time)
+ result = otp.generate(time)
self.assertEqual(result.token, token, msg=prefix)
self.assertEqual(result.counter, time // otp.period, msg=prefix)
if expires:
self.assertEqual(result.expire_time, expires)
#=============================================================================
- # advance()
- #=============================================================================
-
- def test_advance(self):
- """advance()"""
- from passlib.totp import TOTP
- from passlib.exc import PasslibSecurityWarning
-
- # init random key & time
- time = randtime()
- otp = self.randotp()
- period = otp.period
- counter = otp._time_to_counter(time)
- start = counter * period
- self.assertFalse(otp.changed)
- otp.now = lambda: time # fix generator's time for duration of test
-
- # generate token
- otp.last_counter = 0
- result = otp.advance()
- token = result.token
- self.assertEqual(result.counter, counter)
- ##self.assertEqual(result.start_time, start)
- self.assertEqual(otp.last_counter, counter)
- self.assertTrue(otp.verify(token, start))
- self.assertTrue(otp.changed)
-
- # should generate same token for next 29s
- otp.last_counter = 0
- otp.changed = False
- otp.now = lambda : start + period - 1
- self.assertEqual(otp.advance().token, token)
- self.assertEqual(otp.last_counter, counter)
- self.assertTrue(otp.changed)
-
- # and new one at 30s
- otp.last_counter = 0
- otp.now = lambda : start + period
- token2 = otp.advance().token
- self.assertNotEqual(token2, token)
- self.assertEqual(otp.last_counter, counter + 1)
- self.assertTrue(otp.verify(token2, start + period))
-
- # check check we issue a warning time is earlier than last counter.
- otp.last_counter = counter + 1
- otp.now = lambda : time
- with self.assertWarningList([
- dict(message_re=".*earlier than last-used time.*", category=PasslibSecurityWarning),
- ]):
- self.assertTrue(otp.advance().token)
- self.assertEqual(otp.last_counter, counter)
-
- def test_advance_w_reuse_flag(self):
- """advance() -- reuse flag"""
- from passlib.totp import TOTP
- from passlib.exc import UsedTokenError
- otp = TOTP(new=True)
- token = otp.advance().token
- self.assertRaises(UsedTokenError, otp.advance)
- self.assertEqual(otp.advance(reuse=True).token, token)
-
- def test_advance_w_reference_vectors(self):
- """advance() -- reference vectors"""
- self.test_generate_w_reference_vectors(for_advance=True)
-
- #=============================================================================
# TotpMatch() -- verify()'s return value
#=============================================================================
@@ -1067,16 +992,12 @@ class TotpTest(_BaseOTPTest):
#=============================================================================
def assertVerifyMatches(self, expect_skipped, token, time, # *
- otp, check_consume=False, gen_time=None, **kwds):
+ otp, gen_time=None, **kwds):
"""helper to test otp.verify() output is correct"""
# NOTE: TotpMatch return type tested more throughly above ^^^
msg = "key=%r alg=%r period=%r token=%r gen_time=%r time=%r:" % \
(otp.base32_key, otp.alg, otp.period, token, gen_time, time)
- if check_consume:
- verify = self._create_consume_wrapper(otp)
- else:
- verify = otp.verify
- result = verify(token, time, **kwds)
+ result = otp.verify(token, time, **kwds)
self.assertTotpMatch(result,
time=otp.normalize_time(time),
period=otp.period,
@@ -1084,20 +1005,16 @@ class TotpTest(_BaseOTPTest):
msg=msg)
def assertVerifyRaises(self, exc_class, token, time, # *
- otp, check_consume=False, gen_time=None,
+ otp, gen_time=None,
**kwds):
"""helper to test otp.verify() throws correct error"""
# NOTE: TotpMatch return type tested more throughly above ^^^
msg = "key=%r alg=%r period=%r token=%r gen_time=%r time=%r:" % \
(otp.base32_key, otp.alg, otp.period, token, gen_time, time)
- if check_consume:
- verify = self._create_consume_wrapper(otp)
- else:
- verify = otp.verify
- return self.assertRaises(exc_class, verify, token, time,
+ return self.assertRaises(exc_class, otp.verify, token, time,
__msg__=msg, **kwds)
- def test_verify_w_window(self, for_consume=False):
+ def test_verify_w_window(self):
"""verify() -- 'time' and 'window' parameters"""
# init generator & helper
@@ -1105,7 +1022,7 @@ class TotpTest(_BaseOTPTest):
period = otp.period
time = randtime()
token = otp.generate(time).token
- common = dict(otp=otp, gen_time=time, check_consume=for_consume)
+ common = dict(otp=otp, gen_time=time)
assertMatches = partial(self.assertVerifyMatches, **common)
assertRaises = partial(self.assertVerifyRaises, **common)
@@ -1145,13 +1062,13 @@ class TotpTest(_BaseOTPTest):
# reject invalid time
assertRaises(ValueError, token, -1)
- def test_verify_w_skew(self, for_consume=False):
+ def test_verify_w_skew(self):
"""verify() -- 'skew' parameters"""
# init generator & helper
otp = self.randotp()
period = otp.period
time = randtime()
- common = dict(otp=otp, gen_time=time, check_consume=for_consume)
+ common = dict(otp=otp, gen_time=time)
assertMatches = partial(self.assertVerifyMatches, **common)
assertRaises = partial(self.assertVerifyRaises, **common)
@@ -1168,7 +1085,7 @@ class TotpTest(_BaseOTPTest):
# TODO: test skew + larger window
- def test_verify_w_reuse(self, for_consume=False):
+ def test_verify_w_reuse(self):
"""verify() -- 'reuse' and 'last_counter' parameters"""
# init generator & helper
@@ -1179,7 +1096,7 @@ class TotpTest(_BaseOTPTest):
token = tdata.token
counter = tdata.counter
expire_time = tdata.expire_time
- common = dict(otp=otp, gen_time=time, check_consume=for_consume)
+ common = dict(otp=otp, gen_time=time)
assertMatches = partial(self.assertVerifyMatches, **common)
assertRaises = partial(self.assertVerifyRaises, **common)
@@ -1217,14 +1134,11 @@ class TotpTest(_BaseOTPTest):
assertMatches(0, token, time, last_counter=counter,
window=0, reuse=True)
- def test_verify_w_token_normalization(self, for_consume=False):
+ def test_verify_w_token_normalization(self):
"""verify() -- token normalization"""
# setup test helper
otp = TOTP('otxl2f5cctbprpzx')
- if for_consume:
- verify = self._create_consume_wrapper(otp)
- else:
- verify = otp.verify
+ verify = otp.verify
time = 1412889861
# separators / spaces should be stripped (orig token '332136')
@@ -1242,19 +1156,13 @@ class TotpTest(_BaseOTPTest):
# leading zeros count towards size
self.assertRaises(exc.MalformedTokenError, verify, '0123456', time)
- def test_verify_w_reference_vectors(self, for_consume=False):
+ def test_verify_w_reference_vectors(self):
"""verify() -- reference vectors"""
for otp, time, token, expires, msg in self.iter_test_vectors():
# create wrapper
- if for_consume:
- verify = self._create_consume_wrapper(otp)
- else:
- verify = otp.verify
+ verify = otp.verify
# token should verify against time
- if for_consume:
- real_skew = -divmod(time, otp.period)[1]
- msg = "%s(with real_skew=%r):" % (msg, real_skew)
result = verify(token, time)
self.assertTrue(result)
self.assertEqual(result.counter, time // otp.period, msg=msg)
@@ -1263,94 +1171,6 @@ class TotpTest(_BaseOTPTest):
self.assertRaises(exc.InvalidTokenError, verify, token, time + 100, window=0)
#=============================================================================
- # consume()
- #=============================================================================
- def _create_consume_wrapper(self, otp):
- """
- returns a wrapper around consume()
- which makes it's signature & return match verify(),
- to helper out shared test code.
- """
- def wrapper(token, time, last_counter=0, **kwds):
- # reset internal state
- time = otp.normalize_time(time)
- otp.last_counter = last_counter
- otp.changed = False
-
- # monkeypatch current time, and run consume() w/in our sandbox
- orig = otp.now
- try:
- otp.now = lambda: time
- return otp.consume(token, **kwds)
- finally:
- otp.now = orig
-
- return wrapper
-
- def test_consume_w_window(self):
- """consume() -- 'window' parameter"""
- self.test_verify_w_window(for_consume=True)
-
- def test_consume_w_skew(self):
- """consume() -- 'skew' parameter"""
- self.test_verify_w_skew(for_consume=True)
-
- def test_consume_w_reuse(self, for_consume=False):
- """consume() -- 'reuse' parameter & 'last_counter' attribute"""
- self.test_verify_w_reuse(for_consume=True)
-
- def test_consume_w_token_normalization(self):
- """consume() -- token normalization"""
- self.test_verify_w_token_normalization(for_consume=True)
-
- # TODO: roll this into test_consume_w_reuse(), above
- def test_consume_w_last_counter(self):
- """consume() -- 'last_counter' attribute"""
- from passlib.exc import UsedTokenError
-
- # init generator
- otp = self.randotp()
- period = otp.period
-
- time = randtime()
- result = otp.generate(time)
- self.assertEqual(otp.last_counter, 0) # ensure generate() didn't touch it
- token = result.token
- counter = result.counter
- otp.now = lambda : time # fix consume() time for duration of test
-
- # verify token
- self.assertTrue(otp.consume(token))
- self.assertEqual(otp.last_counter, counter)
-
- # test reuse policies
- self.assertRaises(UsedTokenError, otp.consume, token)
- self.assertRaises(UsedTokenError, otp.consume, token, reuse=False)
- self.assertTrue(otp.consume(token, reuse=True))
- self.assertEqual(otp.last_counter, counter)
-
- # should reject older token even if within window
- otp.last_counter = counter
- old_token = otp.generate(time - period).token
- self.assertRaises(exc.InvalidTokenError, otp.consume, old_token)
- self.assertRaises(exc.InvalidTokenError, otp.consume, old_token, reuse=False)
- self.assertRaises(exc.InvalidTokenError, otp.consume, old_token, reuse=True)
- self.assertEqual(otp.last_counter, counter)
-
- # next token should advance .last_counter
- otp.last_counter = counter
- token2 = otp.generate(time + period).token
- otp.now = lambda: time + period
- self.assertTrue(otp.consume(token2))
- self.assertEqual(otp.last_counter, counter + 1)
-
- # TODO: test 'changed flag' behavior
-
- def test_consume_w_reference_vectors(self):
- """consume() -- reference vectors"""
- self.test_verify_w_reference_vectors(for_consume=True)
-
- #=============================================================================
# uri serialization
#=============================================================================
def test_from_uri(self):
diff --git a/passlib/totp.py b/passlib/totp.py
index 845bda3..9e2b3df 100644
--- a/passlib/totp.py
+++ b/passlib/totp.py
@@ -690,10 +690,6 @@ class TOTP(object):
# state attrs
#---------------------------------------------------------------------------
- #: counter value of last token generated by :meth:`advance` *(client-side)*,
- #: or validated by :meth:`consume` *(server-side)*.
- last_counter = 0
-
#: flag set if internal state is modified
changed = False
@@ -706,7 +702,7 @@ class TOTP(object):
new=False, digits=None, alg=None, size=None, period=None,
label=None, issuer=None, context=None, changed=False,
now=None, # NOTE: mainly used for unittesting
- last_counter=0, **kwds):
+ **kwds):
super(TOTP, self).__init__(**kwds)
self.changed = changed
self.context = context
@@ -789,10 +785,6 @@ class TOTP(object):
"now() function must return non-negative int/float"
self.now = now
- # init last counter value
- self._check_serial(last_counter, "last_counter")
- self.last_counter = last_counter
-
#=============================================================================
# helpers to verify value types & ranges
#=============================================================================
@@ -954,7 +946,8 @@ class TOTP(object):
def generate(self, time=None):
"""
- Low-level method to generate token for specified time.
+ Generate token for specified time
+ (uses current time if none specified).
:arg time:
Can be ``None``, :class:`!datetime`,
@@ -979,78 +972,12 @@ class TOTP(object):
>>> # when you just need the token...
>>> otp.generate(1419622739).token
'897212'
-
- .. seealso::
- This is a lowlevel method, which doesn't read or modify any state-dependant values
- (such as the :attr:`last_counter` value).
"""
time = self.normalize_time(time)
counter = self._time_to_counter(time)
token = self._generate(counter)
return TotpToken(self, token, counter)
- def advance(self, reuse=False):
- """
- High-level method to generate TOTP token for current time.
- Unlike :meth:`generate`, this method takes into account the :attr:`last_counter` value,
- and updates that attribute to match the returned token.
-
- :param reuse:
- Controls whether a token can be issued twice within the same time :attr:`period`.
-
- By default (``False``), calling this method twice within the same time :attr:`period`
- will result in a :exc:`~passlib.exc.UsedTokenError`, since once a token has gone across the wire,
- it should be considered insecure.
-
- Setting this to ``True`` will allow multiple uses of the token within the same time period.
-
- :raises ~passlib.exc.UsedTokenError:
-
- if an attempt is made to generate a token within the same time :attr:`period`
- (suppressed by ``reuse=True``).
-
- :returns:
-
- sequence of ``(token, expire_time)`` (actually a :class:`TotpToken` instance):
-
- * ``token`` -- decimal-formatted token as a (unicode) string
- * ``expire_time`` -- unix epoch time when token will expire
-
- Usage example::
-
- >>> # IMPORTANT: THE 'now' PARAMETER SHOULD NOT BE USED IN PRODUCTION.
- >>> # It's only used here to fix the totp generator's clock, so that
- >>> # this example can be reproduced regardless of the actual system time.
-
- >>> # generate new token, wrapped in a TotpToken instance...
- >>> totp = TOTP('s3jdvb7qd2r7jpxx', now=lambda : 1419622739)
- >>> totp.advance()
- <TotpToken token='897212' expire_time=1419622740>
-
- >>> # or use attr access when you just need the token ...
- >>> totp.advance().token
- '897212'
- """
- time = self.normalize_time(None)
- result = self.generate(time)
-
- if result.counter < self.last_counter:
- # NOTE: this usually means system time has jumped back since last call.
- # this will occasionally happen, so not throwing an error,
- # but definitely worth issuing a warning.
- warn("TOTP.advance(): current time (%r) earlier than last-used time (%r); "
- "did system clock change?" % (int(time), self.last_counter * self.period),
- exc.PasslibSecurityWarning, stacklevel=1)
-
- elif result.counter == self.last_counter and not reuse:
- raise UsedTokenError("Token already generated in this time period, "
- "please wait %d seconds for another." % result.remaining,
- expire_time=result.expire_time)
-
- self.last_counter = result.counter
- self.changed = True
- return result
-
def _generate(self, counter):
"""
base implementation of HOTP token generation algorithm.
@@ -1084,7 +1011,7 @@ class TOTP(object):
def verify(self, token, time=None, window=30, skew=0, last_counter=-1, reuse=False):
"""
- Low-level method to validate TOTP token against specified timestamp.
+ Validate TOTP token against specified timestamp.
Searches within a window before & after the provided time,
in order to account for transmission delay and small amounts of skew in the client's clock.
@@ -1168,11 +1095,6 @@ class TOTP(object):
Traceback:
...
InvalidTokenError: Token did not match
-
- .. seealso::
- This is a low-level method, which doesn't read or modify any state-dependant values
- (such as the :attr:`last_counter` value).
- For a version which does, see :meth:`consume`.
"""
time = self.normalize_time(time)
self._check_serial(window, "window")
@@ -1196,94 +1118,9 @@ class TOTP(object):
# can use historical .skipped values to estimate future skew.
return TotpMatch(counter, time, self.period)
- def consume(self, token, reuse=False, window=30, skew=0):
- """
- High-level method to validate TOTP token against current system time.
- Unlike :meth:`verify`, this method takes into account the :attr:`last_counter` value,
- and updates that attribute if a match is found.
-
- :arg token:
- token to validate.
- may be integer or string (whitespace and hyphens are ignored).
-
- :param bool reuse:
- Controls whether a token can be issued twice within the same time :attr:`period`.
-
- By default (``False``), attempting to verify the same token twice within the same time :attr:`period`
- will result in a :exc:`~passlib.exc.TokenReuseError`.
- Setting this to ``True`` will silently allow multiple uses of the token within the same time period.
-
- Note that enabling this exposes your application to a replay attack:
- if an attacker is able to read the token (whether physically
- as the user types it, or going across the wire), the attacker
- will be able to re-use any time over the next <period> seconds.
-
- :param int window:
- How far backward and forward in time to search for a match.
- Measured in seconds. Defaults to ``30``. Typically only useful if set
- to multiples of :attr:`period`.
-
- :raises ~passlib.exc.TokenError:
-
- If the token is malformed, fails to verify,
- or an attempt is made to re-use the current time period's token
- (this last check can be suppressed by ``reuse=True``)
-
- :returns:
- If token validated, returns the :class:`HotpMatch` instance from the underlying :meth:`verify` call.
- Raise errors on invalid/malformed tokens.
-
- May set the :attr:`changed` attribute if the internal state was updated,
- and needs to be re-persisted by the application (see :meth:`to_json`).
-
- Usage example::
-
- >>> # IMPORTANT: THE 'now' PARAMETER SHOULD NOT BE USED IN PRODUCTION.
- >>> # It's only used here to fix the totp generator's clock, so that
- >>> # this example can be reproduced regardless of the actual system time.
- >>> totp = TOTP('s3jdvb7qd2r7jpxx', now = lambda: 1419622739)
-
- >>> # wrong token
- >>> totp.consume('123456')
- Traceback:
- ...
- InvalidTokenError: Token did not match
-
- >>> # token from 30 sec ago (w/ window, will be accepted)
- >>> totp.consume('000492')
- <TotpMatch counter=47320756 time=1419622729>
-
- >>> # token from current period
- >>> totp.consume('897212')
- <TotpMatch counter=47320757 time=1419622729>
-
- >>> # token from 30 sec ago will now be rejected
- >>> totp.consume('000492')
- Traceback:
- ...
- InvalidTokenError: Token did not match
- """
- time = self.normalize_time(None)
- # NOTE: setting min_start so verify() doesn't even bother checking
- # points before the last verified counter, no matter what offset or window is set to.
- match = self.verify(token, time, window=window, skew=skew, last_counter=self.last_counter,
- reuse=reuse)
- assert match.time == time, "sanity check failed: verify().time didn't match input time"
- assert match.counter >= self.last_counter, "sanity check failed: verify() didn't honor last_counter"
- assert reuse or match.counter != self.last_counter
-
- if match.counter != self.last_counter:
- # accept new token, update internal state
- self.last_counter = match.counter
- self.changed = True
- else:
- assert reuse, "sanity check failed"
- # XXX: return the match instead?
- return match
-
def _find_match(self, token, start, end, expected=None):
"""
- helper for verify() implementations --
+ helper for verify() --
returns counter value within specified range that matches token.
:arg token:
@@ -1590,7 +1427,7 @@ class TOTP(object):
# default json format is just serialization of constructor kwds.
# XXX: just pass all this through to _from_json / constructor?
# go ahead and mark as changed (needs re-saving) if the version is too old
- assert cls._check_otp_type(type), "check legacy type parameter"
+ assert cls._check_otp_type(type) # check legacy 'type' parameter
ver = kwds.pop("v", None)
if not ver or ver < cls.min_json_version or ver > cls.json_version:
raise cls._json_error("missing/unsupported version (%r)" % (ver,))
@@ -1606,6 +1443,7 @@ class TOTP(object):
kwds.update(key=kwds.pop("enckey"), format="encrypted")
elif 'key' not in kwds:
raise cls._json_error("missing 'enckey' / 'key'")
+ kwds.pop("last_counter", None) # extract legacy counter parameter
return kwds
@classmethod
@@ -1667,8 +1505,6 @@ class TOTP(object):
state['key'] = self.base32_key
if self.period != 30:
state['period'] = self.period
- if self.last_counter:
- state['last_counter'] = self.last_counter
# NOTE: in the future, may add a "history" parameter
# containing a list of (time, skipped) pairs, encoding
# the last X successful verifications, to allow persisting
@@ -1684,7 +1520,7 @@ class TOTP(object):
#=============================================================================
class TotpToken(SequenceMixin):
"""
- Object returned by :meth:`TOTP.generate` and :meth:`TOTP.advance`.
+ Object returned by :meth:`TOTP.generate`.
It can be treated as a sequence of ``(token, expire_time)``,
or accessed via the following attributes: