summaryrefslogtreecommitdiff
path: root/test/dialect/mysql/test_for_update.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2018-04-30 11:31:48 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2018-05-01 20:52:44 -0400
commit453cdfd75688e869637f0bbd171594e1fe7b6a39 (patch)
tree753529237e0aa2792252bf78554c81cbc0b04b05 /test/dialect/mysql/test_for_update.py
parent29c5f7a5d5dae47bef823804096cb4a7122ff8fa (diff)
downloadsqlalchemy-453cdfd75688e869637f0bbd171594e1fe7b6a39.tar.gz
Render FOR UPDATE on the inner subquery as well as the outer
The ORM now doubles the "FOR UPDATE" clause within the subquery that renders in conjunction with joined eager loading in some cases, as it has been observed that MySQL does not lock the rows from a subquery. This means the query renders with two FOR UPDATE clauses; note that on some backends such as Oracle, FOR UPDATE clauses on subqueries are silently ignored since they are unnecessary. Additionally, in the case of the "OF" clause used primarily with Postgresql, the FOR UPDATE is rendered only on the inner subquery when this is used so that the selectable can be targeted to the table within the SELECT statement. Change-Id: Ie5520d08d82bf0afd9e1bd2d43a0b2a0db0de16d Fixes: #4246
Diffstat (limited to 'test/dialect/mysql/test_for_update.py')
-rw-r--r--test/dialect/mysql/test_for_update.py161
1 files changed, 161 insertions, 0 deletions
diff --git a/test/dialect/mysql/test_for_update.py b/test/dialect/mysql/test_for_update.py
new file mode 100644
index 000000000..af467f920
--- /dev/null
+++ b/test/dialect/mysql/test_for_update.py
@@ -0,0 +1,161 @@
+"""Test MySQL FOR UPDATE behavior.
+
+See #4246
+
+"""
+import contextlib
+
+from sqlalchemy import Column, Integer, ForeignKey, update
+from sqlalchemy.orm import relationship, Session, joinedload
+from sqlalchemy import exc
+
+from sqlalchemy.testing import fixtures
+from sqlalchemy import testing
+
+
+class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
+ __backend__ = True
+ __only_on__ = 'mysql'
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = 'a'
+ id = Column(Integer, primary_key=True)
+ x = Column(Integer)
+ y = Column(Integer)
+ bs = relationship("B")
+ __table_args__ = {"mysql_engine": "InnoDB"}
+
+ class B(Base):
+ __tablename__ = 'b'
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey('a.id'))
+ x = Column(Integer)
+ y = Column(Integer)
+ __table_args__ = {"mysql_engine": "InnoDB"}
+
+ @classmethod
+ def insert_data(cls):
+ A = cls.classes.A
+ B = cls.classes.B
+
+ # all the x/y are < 10
+ s = Session()
+ s.add_all(
+ [
+ A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]),
+ A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)])
+ ]
+ )
+ s.commit()
+
+ @contextlib.contextmanager
+ def run_test(self):
+ connection = testing.db.connect()
+ connection.execute("set innodb_lock_wait_timeout=1")
+ main_trans = connection.begin()
+ try:
+ yield Session(bind=connection)
+ finally:
+ main_trans.rollback()
+ connection.close()
+
+ def _assert_a_is_locked(self, should_be_locked):
+ A = self.classes.A
+ with testing.db.begin() as alt_trans:
+ alt_trans.execute("set innodb_lock_wait_timeout=1")
+ # set x/y > 10
+ try:
+ alt_trans.execute(
+ update(A).values(x=15, y=19)
+ )
+ except (exc.InternalError, exc.OperationalError) as err:
+ assert "Lock wait timeout exceeded" in str(err)
+ assert should_be_locked
+ else:
+ assert not should_be_locked
+
+ def _assert_b_is_locked(self, should_be_locked):
+ B = self.classes.B
+ with testing.db.begin() as alt_trans:
+ alt_trans.execute("set innodb_lock_wait_timeout=1")
+ # set x/y > 10
+ try:
+ alt_trans.execute(
+ update(B).values(x=15, y=19)
+ )
+ except (exc.InternalError, exc.OperationalError) as err:
+ assert "Lock wait timeout exceeded" in str(err)
+ assert should_be_locked
+ else:
+ assert not should_be_locked
+
+ def test_basic_lock(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).with_for_update().all()
+ # test our fixture
+ self._assert_a_is_locked(True)
+
+ def test_basic_not_lock(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).all()
+ # test our fixture
+ self._assert_a_is_locked(False)
+
+ def test_joined_lock_subquery(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).options(joinedload(A.bs)).with_for_update().first()
+
+ # test for issue #4246, should be locked
+ self._assert_a_is_locked(True)
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_subquery_inner_for_update(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).with_for_update().subquery()
+ s.query(q).join(B).all()
+
+ # FOR UPDATE is inside the subquery, should be locked
+ self._assert_a_is_locked(True)
+
+ # FOR UPDATE is inside the subquery, B is not locked
+ self._assert_b_is_locked(False)
+
+ def test_joined_lock_subquery_inner_for_update_outer(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).with_for_update().subquery()
+ s.query(q).join(B).with_for_update().all()
+
+ # FOR UPDATE is inside the subquery, should be locked
+ self._assert_a_is_locked(True)
+
+ # FOR UPDATE is also outside the subquery, B is locked
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_subquery_order_for_update_outer(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).order_by(A.id).subquery()
+ s.query(q).join(B).with_for_update().all()
+ # FOR UPDATE is inside the subquery, should not be locked
+ self._assert_a_is_locked(False)
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_no_subquery(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).options(joinedload(A.bs)).with_for_update().all()
+ # no subquery, should be locked
+ self._assert_a_is_locked(True)
+ self._assert_b_is_locked(True)