summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2017-02-02 02:40:28 +0000
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2017-02-02 02:40:28 +0000
commit626e57acda52ac4fe54ca4891adfc961042c887a (patch)
tree70f96bb624e1c1dfcaf087e76b7f7828cb0564d0
parentad6506ff69ea17c11a514b964836409921f7560c (diff)
parent27652ed3b0afda8d175fa673ecd4e341b3a58c3c (diff)
downloadpsycopg2-626e57acda52ac4fe54ca4891adfc961042c887a.tar.gz
Merge branch 'fast-executemany'
-rw-r--r--NEWS6
-rw-r--r--doc/src/cursor.rst17
-rw-r--r--doc/src/extras.rst57
-rw-r--r--lib/extras.py82
-rwxr-xr-xtests/test_types_extras.py178
5 files changed, 334 insertions, 6 deletions
diff --git a/NEWS b/NEWS
index b4d11e6..a0f1810 100644
--- a/NEWS
+++ b/NEWS
@@ -27,6 +27,12 @@ New features:
- `~cursor.callproc()` now accepts a dictionary of parameters (:ticket:`#381`).
- Using Python C API decoding functions and codecs caching for faster
unicode encoding/decoding (:ticket:`#473`).
+- `~cursor.executemany()` slowness addressed by
+ `~psycopg2.extras.execute_batch()` and `~psycopg2.extras.execute_values()`
+ (:ticket:`#491`).
+
+Bug fixes:
+
- Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection`
(:ticket:`#483`).
diff --git a/doc/src/cursor.rst b/doc/src/cursor.rst
index aee6b46..4161f2a 100644
--- a/doc/src/cursor.rst
+++ b/doc/src/cursor.rst
@@ -172,33 +172,38 @@ The ``cursor`` class
.. method:: execute(operation [, parameters])
-
+
Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to
variables in the operation. Variables are specified either with
positional (``%s``) or named (:samp:`%({name})s`) placeholders. See
:ref:`query-parameters`.
-
+
The method returns `!None`. If a query was executed, the returned
values can be retrieved using |fetch*|_ methods.
.. method:: executemany(operation, seq_of_parameters)
-
+
Prepare a database operation (query or command) and then execute it
against all parameter tuples or mappings found in the sequence
`seq_of_parameters`.
-
+
The function is mostly useful for commands that update the database:
any result set returned by the query is discarded.
-
+
Parameters are bounded to the query using the same rules described in
the `~cursor.execute()` method.
+ .. warning::
+ In its current implementation this method is not faster than
+ executing `~cursor.execute()` in a loop. For better performance
+ you can use the functions described in :ref:`fast-exec`.
+
.. method:: callproc(procname [, parameters])
-
+
Call a stored database procedure with the given name. The sequence of
parameters must contain one entry for each argument that the procedure
expects. Overloaded procedures are supported. Named parameters can be
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index d33b8ee..5ef4223 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -974,6 +974,63 @@ converted into lists of strings.
future versions.
+
+.. _fast-exec:
+
+Fast execution helpers
+----------------------
+
+The current implementation of `~cursor.executemany()` is (using an extremely
+charitable understatement) not particularly performing. These functions can
+be used to speed up the repeated execution of a statement againts a set of
+parameters. By reducing the number of server roundtrips the performance can be
+`orders of magnitude better`__ than using `!executemany()`.
+
+.. __: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038
+
+
+.. autofunction:: execute_batch
+
+ .. versionadded:: 2.7
+
+.. note::
+
+ `!execute_batch()` can be also used in conjunction with PostgreSQL
+ prepared statements using |PREPARE|_, |EXECUTE|_, |DEALLOCATE|_.
+ Instead of executing::
+
+ execute_batch(cur,
+ "big and complex SQL with %s %s params",
+ params_list)
+
+ it is possible to execute something like::
+
+ cur.execute("PREPARE stmt AS big and complex SQL with $1 $2 params")
+ execute_batch(cur, "EXECUTE stmt (%s, %s)", params_list)
+ cur.execute("DEALLOCATE stmt")
+
+ which may bring further performance benefits: if the operation to perform
+ is complex, every single execution will be faster as the query plan is
+ already cached; furthermore the amount of data to send on the server will
+ be lesser (one |EXECUTE| per param set instead of the whole, likely
+ longer, statement).
+
+ .. |PREPARE| replace:: :sql:`PREPARE`
+ .. _PREPARE: https://www.postgresql.org/docs/current/static/sql-prepare.html
+
+ .. |EXECUTE| replace:: :sql:`EXECUTE`
+ .. _EXECUTE: https://www.postgresql.org/docs/current/static/sql-execute.html
+
+ .. |DEALLOCATE| replace:: :sql:`DEALLOCATE`
+ .. _DEALLOCATE: https://www.postgresql.org/docs/current/static/sql-deallocate.html
+
+
+.. autofunction:: execute_values
+
+ .. versionadded:: 2.7
+
+
+
.. index::
single: Time zones; Fractional
diff --git a/lib/extras.py b/lib/extras.py
index c1d1567..2d26402 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -1141,3 +1141,85 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
caster.array_typecaster, not globally and conn_or_curs or None)
return caster
+
+
+def _paginate(seq, page_size):
+ """Consume an iterable and return it in chunks.
+
+ Every chunk is at most `page_size`. Never return an empty chunk.
+ """
+ page = []
+ it = iter(seq)
+ while 1:
+ try:
+ for i in xrange(page_size):
+ page.append(it.next())
+ yield page
+ page = []
+ except StopIteration:
+ if page:
+ yield page
+ return
+
+
+def execute_batch(cur, sql, argslist, page_size=100):
+ """Execute groups of statements in fewer server roundtrips.
+
+ Execute *sql* several times, against all parameters set (sequences or
+ mappings) found in *argslist*.
+
+ The function is semantically similar to `~cursor.executemany()`, but has a
+ different implementation: Psycopg will join the statements into fewer
+ multi-statement commands, reducing the number of server roundtrips,
+ resulting in better performances. Every command contains at most
+ *page_size* statements.
+
+ """
+ for page in _paginate(argslist, page_size=page_size):
+ sqls = [cur.mogrify(sql, args) for args in page]
+ cur.execute(b";".join(sqls))
+
+
+def execute_values(cur, sql, argslist, template=None, page_size=100):
+ '''Execute a statement using :sql:`VALUES` with a sequence of parameters.
+
+ *sql* must contain a single ``%s`` placeholder, which will be replaced by a
+ `VALUES list`__. Every statement will contain at most *page_size* sets of
+ arguments.
+
+ .. __: https://www.postgresql.org/docs/current/static/queries-values.html
+
+ *template* is the part merged to the arguments, so it should be compatible
+ with the content of *argslist* (it should contain the right number of
+ arguments if *argslist* is a sequence of sequences, or compatible names if
+ *argslist* is a sequence of mappings). If not specified, assume the
+ arguments are sequence and use a simple positional template (i.e.
+ ``(%s, %s, ...)``).
+
+ While :sql:`INSERT` is an obvious candidate for this function it is
+ possible to use it with other statements, for example::
+
+ >>> cur.execute(
+ ... "create table test (id int primary key, v1 int, v2 int)")
+
+ >>> execute_values(cur,
+ ... "INSERT INTO test (id, v1, v2) VALUES %s",
+ ... [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
+
+ >>> execute_values(cur,
+ ... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
+ ... WHERE test.id = data.id""",
+ ... [(1, 20), (4, 50)])
+
+ >>> cur.execute("select * from test order by id")
+ >>> cur.fetchall()
+ [(1, 20, 3), (4, 50, 6), (7, 8, 9)])
+
+ '''
+ for page in _paginate(argslist, page_size=page_size):
+ if template is None:
+ template = '(%s)' % ','.join(['%s'] * len(page[0]))
+ values = b",".join(cur.mogrify(template, args) for args in page)
+ if isinstance(values, bytes):
+ values = values.decode(_ext.encodings[cur.connection.encoding])
+ cur.execute(sql % (values,))
diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py
index f28c5c2..8fe3bae 100755
--- a/tests/test_types_extras.py
+++ b/tests/test_types_extras.py
@@ -1766,6 +1766,184 @@ class RangeCasterTestCase(ConnectingTestCase):
decorate_all_tests(RangeCasterTestCase, skip_if_no_range)
+class TestFastExecute(ConnectingTestCase):
+ def setUp(self):
+ super(TestFastExecute, self).setUp()
+ cur = self.conn.cursor()
+ cur.execute("""create table testfast (
+ id serial primary key, date date, val int, data text)""")
+
+ def test_paginate(self):
+ def pag(seq):
+ return psycopg2.extras._paginate(seq, 100)
+
+ self.assertEqual(list(pag([])), [])
+ self.assertEqual(list(pag([1])), [[1]])
+ self.assertEqual(list(pag(range(99))), [list(range(99))])
+ self.assertEqual(list(pag(range(100))), [list(range(100))])
+ self.assertEqual(list(pag(range(101))), [list(range(100)), [100]])
+ self.assertEqual(
+ list(pag(range(200))), [list(range(100)), list(range(100, 200))])
+ self.assertEqual(
+ list(pag(range(1000))),
+ [list(range(i * 100, (i + 1) * 100)) for i in range(10)])
+
+ def test_execute_batch_empty(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ [])
+ cur.execute("select * from testfast order by id")
+ self.assertEqual(cur.fetchall(), [])
+
+ def test_execute_batch_one(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ iter([(1, 10)]))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(1, 10)])
+
+ def test_execute_batch_tuples(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, date, val) values (%s, %s, %s)",
+ ((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_execute_batch_many(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ ((i, i * 10) for i in range(1000)))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
+
+ def test_execute_batch_pages(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ ((i, i * 10) for i in range(25)),
+ page_size=10)
+
+ # last command was 5 statements
+ self.assertEqual(sum(c == u';' for c in cur.query.decode('ascii')), 4)
+
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
+
+ def test_execute_batch_unicode(self):
+ cur = self.conn.cursor()
+ ext.register_type(ext.UNICODE, cur)
+ snowman = u"\u2603"
+
+ # unicode in statement
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
+ [(1, 'x')])
+ cur.execute("select id, data from testfast where id = 1")
+ self.assertEqual(cur.fetchone(), (1, 'x'))
+
+ # unicode in data
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%s, %s)",
+ [(2, snowman)])
+ cur.execute("select id, data from testfast where id = 2")
+ self.assertEqual(cur.fetchone(), (2, snowman))
+
+ # unicode in both
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
+ [(3, snowman)])
+ cur.execute("select id, data from testfast where id = 3")
+ self.assertEqual(cur.fetchone(), (3, snowman))
+
+ def test_execute_values_empty(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ [])
+ cur.execute("select * from testfast order by id")
+ self.assertEqual(cur.fetchall(), [])
+
+ def test_execute_values_one(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ iter([(1, 10)]))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(1, 10)])
+
+ def test_execute_values_tuples(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, date, val) values %s",
+ ((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_execute_values_dicts(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, date, val) values %s",
+ (dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar")
+ for i in range(10)),
+ template='(%(id)s, %(date)s, %(val)s)')
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_execute_values_many(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ ((i, i * 10) for i in range(1000)))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
+
+ def test_execute_values_pages(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ ((i, i * 10) for i in range(25)),
+ page_size=10)
+
+ # last statement was 5 tuples (one parens is for the fields list)
+ self.assertEqual(sum(c == '(' for c in cur.query.decode('ascii')), 6)
+
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
+
+ def test_execute_values_unicode(self):
+ cur = self.conn.cursor()
+ ext.register_type(ext.UNICODE, cur)
+ snowman = u"\u2603"
+
+ # unicode in statement
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %%s -- %s" % snowman,
+ [(1, 'x')])
+ cur.execute("select id, data from testfast where id = 1")
+ self.assertEqual(cur.fetchone(), (1, 'x'))
+
+ # unicode in data
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %s",
+ [(2, snowman)])
+ cur.execute("select id, data from testfast where id = 2")
+ self.assertEqual(cur.fetchone(), (2, snowman))
+
+ # unicode in both
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %%s -- %s" % snowman,
+ [(3, snowman)])
+ cur.execute("select id, data from testfast where id = 3")
+ self.assertEqual(cur.fetchone(), (3, snowman))
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)