summaryrefslogtreecommitdiff
path: root/zuul/driver/sql/sqlconnection.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/driver/sql/sqlconnection.py')
-rw-r--r--zuul/driver/sql/sqlconnection.py45
1 files changed, 43 insertions, 2 deletions
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index 7a4aea626..26bcf184d 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -13,6 +13,7 @@
# under the License.
import logging
+import re
import time
from urllib.parse import quote_plus
@@ -27,12 +28,29 @@ import sqlalchemy.pool
from zuul.connection import BaseConnection
from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock
+
BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
BUILD_EVENTS_TABLE = 'zuul_build_event'
ARTIFACT_TABLE = 'zuul_artifact'
PROVIDES_TABLE = 'zuul_provides'
+STATEMENT_TIMEOUT_RE = re.compile(r'/\* statement_timeout=(\d+) \*/')
+
+
+# In Postgres we can set a per-transaction (which for us is
+# effectively per-query) execution timeout by executing "SET LOCAL
+# statement_timeout" before our query. There isn't a great way to do
+# this using the SQLAlchemy query API, so instead, we add a comment as
+# a hint, and here we parse that comment and execute the "SET". The
+# comment remains in the query sent to the server, but that's okay --
+# it may even help an operator in debugging.
+@sa.event.listens_for(sa.Engine, "before_cursor_execute")
+def _set_timeout(conn, cursor, stmt, params, context, executemany):
+ match = STATEMENT_TIMEOUT_RE.search(stmt)
+ if match:
+ cursor.execute("SET LOCAL statement_timeout=%s" % match.groups())
+
class DatabaseSession(object):
@@ -76,7 +94,7 @@ class DatabaseSession(object):
result=None, provides=None, final=None, held=None,
complete=None, sort_by_buildset=False, limit=50,
offset=0, idx_min=None, idx_max=None,
- exclude_result=None):
+ exclude_result=None, query_timeout=None):
build_table = self.connection.zuul_build_table
buildset_table = self.connection.zuul_buildset_table
@@ -104,6 +122,17 @@ class DatabaseSession(object):
if not (project or change or uuid):
q = q.with_hint(build_table, 'USE INDEX (PRIMARY)', 'mysql')
+ if query_timeout:
+ # For MySQL, we can add a query hint directly.
+ q = q.prefix_with(
+ f'/*+ MAX_EXECUTION_TIME({query_timeout}) */',
+ dialect='mysql')
+ # For Postgres, we add a comment that we parse in our
+ # event handler.
+ q = q.with_statement_hint(
+ f'/* statement_timeout={query_timeout} */',
+ dialect_name='postgresql')
+
q = self.listFilter(q, buildset_table.c.tenant, tenant)
q = self.listFilter(q, buildset_table.c.project, project)
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
@@ -185,7 +214,8 @@ class DatabaseSession(object):
change=None, branch=None, patchset=None, ref=None,
newrev=None, uuid=None, result=None, complete=None,
updated_max=None,
- limit=50, offset=0, idx_min=None, idx_max=None):
+ limit=50, offset=0, idx_min=None, idx_max=None,
+ query_timeout=None):
buildset_table = self.connection.zuul_buildset_table
@@ -194,6 +224,17 @@ class DatabaseSession(object):
if not (project or change or uuid):
q = q.with_hint(buildset_table, 'USE INDEX (PRIMARY)', 'mysql')
+ if query_timeout:
+ # For MySQL, we can add a query hint directly.
+ q = q.prefix_with(
+ f'/*+ MAX_EXECUTION_TIME({query_timeout}) */',
+ dialect='mysql')
+ # For Postgres, we add a comment that we parse in our
+ # event handler.
+ q = q.with_statement_hint(
+ f'/* statement_timeout={query_timeout} */',
+ dialect_name='postgresql')
+
q = self.listFilter(q, buildset_table.c.tenant, tenant)
q = self.listFilter(q, buildset_table.c.project, project)
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)