diff options
author | Pavlo Shchelokovskyy <shchelokovskyy@gmail.com> | 2021-07-13 12:06:41 +0300 |
---|---|---|
committer | Pavlo Shchelokovskyy <pshchelokovskyy@mirantis.com> | 2021-07-13 09:44:06 +0000 |
commit | 3e1f150926029b6a553cbef959f370e39ce6bb5a (patch) | |
tree | 3b1dff7c91b292b61133050e2ceb930d64348dc3 | |
parent | 0e9f7367e7abf737e3fbde58350fb1034a3a36e8 (diff) | |
download | taskflow-3e1f150926029b6a553cbef959f370e39ce6bb5a.tar.gz |
Use custom JSONType columns
the JSONType from sqlalchemy_utils is quite brittle as it only does
primitive json.dumps on values. This leads to various sorts of
StorageFailure exceptions in taskflow when, for example, an unserializable
exception bubbles up to the 'failure' field of AtomDetails.
This patch sublclasses the JSONType from sqlalchemy_utils and overrides
two of its methods that do (de)serialization to work via
oslo.serialization functions. They deal with such occurencies much
better, for example, by providing 'str' as a fallback default.
Change-Id: I3b9e9498b155199a4e707006a0cf22cda0567c06
Related-Bug: #1935957
-rw-r--r-- | taskflow/persistence/backends/sqlalchemy/tables.py | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py index b9b065e..062409a 100644 --- a/taskflow/persistence/backends/sqlalchemy/tables.py +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -16,10 +16,11 @@ import collections +from oslo_serialization import jsonutils from oslo_utils import timeutils from oslo_utils import uuidutils from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum -import sqlalchemy_utils as su +from sqlalchemy_utils.types import json as json_type from taskflow.persistence import models from taskflow import states @@ -34,6 +35,24 @@ STATE_LENGTH = 255 VERSION_LENGTH = 64 +class JSONType(json_type.JSONType): + """Customized JSONType using oslo.serialization for json operations""" + + def process_bind_param(self, value, dialect): + if dialect.name == 'postgresql' and json_type.has_postgres_json: + return value + if value is not None: + value = jsonutils.dumps(value) + return value + + def process_result_value(self, value, dialect): + if dialect.name == 'postgresql': + return value + if value is not None: + value = jsonutils.loads(value) + return value + + def fetch(metadata): """Returns the master set of table objects (which is also there schema).""" logbooks = Table('logbooks', metadata, @@ -41,7 +60,7 @@ def fetch(metadata): default=timeutils.utcnow), Column('updated_at', DateTime, onupdate=timeutils.utcnow), - Column('meta', su.JSONType), + Column('meta', JSONType), Column('name', String(length=NAME_LENGTH)), Column('uuid', String(length=UUID_LENGTH), primary_key=True, nullable=False, unique=True, @@ -54,7 +73,7 @@ def fetch(metadata): Column('parent_uuid', String(length=UUID_LENGTH), ForeignKey('logbooks.uuid', ondelete='CASCADE')), - Column('meta', su.JSONType), + Column('meta', JSONType), Column('name', String(length=NAME_LENGTH)), Column('state', String(length=STATE_LENGTH)), Column('uuid', String(length=UUID_LENGTH), @@ -65,7 +84,7 @@ def fetch(metadata): default=timeutils.utcnow), Column('updated_at', DateTime, onupdate=timeutils.utcnow), - Column('meta', su.JSONType), + Column('meta', JSONType), Column('parent_uuid', String(length=UUID_LENGTH), ForeignKey('flowdetails.uuid', ondelete='CASCADE')), @@ -75,10 +94,10 @@ def fetch(metadata): Column('uuid', String(length=UUID_LENGTH), primary_key=True, nullable=False, unique=True, default=uuidutils.generate_uuid), - Column('failure', su.JSONType), - Column('results', su.JSONType), - Column('revert_results', su.JSONType), - Column('revert_failure', su.JSONType), + Column('failure', JSONType), + Column('results', JSONType), + Column('revert_results', JSONType), + Column('revert_failure', JSONType), Column('atom_type', Enum(*models.ATOM_TYPES, name='atom_types')), Column('intention', Enum(*states.INTENTIONS, |