summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--taskflow/engines/action_engine/process_executor.py8
-rw-r--r--taskflow/engines/worker_based/dispatcher.py6
-rw-r--r--taskflow/engines/worker_based/proxy.py6
-rw-r--r--taskflow/engines/worker_based/server.py42
-rw-r--r--taskflow/jobs/backends/impl_redis.py6
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py21
-rw-r--r--taskflow/listeners/base.py4
-rw-r--r--taskflow/listeners/claims.py11
-rw-r--r--taskflow/listeners/timing.py17
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py6
-rw-r--r--taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py4
-rw-r--r--taskflow/task.py5
-rw-r--r--taskflow/tests/unit/test_listeners.py8
-rw-r--r--taskflow/tests/unit/test_task.py18
-rw-r--r--taskflow/types/notifier.py6
-rw-r--r--taskflow/utils/persistence_utils.py4
16 files changed, 89 insertions, 83 deletions
diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py
index e59852f..85d37e0 100644
--- a/taskflow/engines/action_engine/process_executor.py
+++ b/taskflow/engines/action_engine/process_executor.py
@@ -504,16 +504,16 @@ class DispatcherHandler(asyncore.dispatcher):
try:
self.reader.feed(data)
except (IOError, UnknownSender):
- LOG.warn("Invalid received message", exc_info=True)
+ LOG.warning("Invalid received message", exc_info=True)
self.handle_close()
except _DECODE_ENCODE_ERRORS:
- LOG.warn("Badly formatted message", exc_info=True)
+ LOG.warning("Badly formatted message", exc_info=True)
self.handle_close()
except (ValueError, su.ValidationError):
- LOG.warn("Failed validating message", exc_info=True)
+ LOG.warning("Failed validating message", exc_info=True)
self.handle_close()
except ChallengeIgnored:
- LOG.warn("Failed challenge sequence", exc_info=True)
+ LOG.warning("Failed challenge sequence", exc_info=True)
self.handle_close()
diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py
index 2735073..c6e5d25 100644
--- a/taskflow/engines/worker_based/dispatcher.py
+++ b/taskflow/engines/worker_based/dispatcher.py
@@ -135,9 +135,9 @@ class TypeDispatcher(object):
except excp.InvalidFormat as e:
message.reject_log_error(
logger=LOG, errors=(kombu_exc.MessageStateError,))
- LOG.warn("Message '%s' (%s) was rejected due to it being"
- " in an invalid format: %s",
- ku.DelayedPretty(message), message_type, e)
+ LOG.warning("Message '%s' (%s) was rejected due to it"
+ " being in an invalid format: %s",
+ ku.DelayedPretty(message), message_type, e)
return
message.ack_log_error(logger=LOG,
errors=(kombu_exc.MessageStateError,))
diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py
index 386e377..e58c7a2 100644
--- a/taskflow/engines/worker_based/proxy.py
+++ b/taskflow/engines/worker_based/proxy.py
@@ -162,9 +162,9 @@ class Proxy(object):
# Filter out any empty keys...
routing_keys = [r_k for r_k in routing_keys if r_k]
if not routing_keys:
- LOG.warn("No routing key/s specified; unable to send '%s'"
- " to any target queue on exchange '%s'", msg,
- self._exchange_name)
+ LOG.warning("No routing key/s specified; unable to send '%s'"
+ " to any target queue on exchange '%s'", msg,
+ self._exchange_name)
return
def _publish(producer, routing_key):
diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py
index 978ab3a..fe51831 100644
--- a/taskflow/engines/worker_based/server.py
+++ b/taskflow/engines/worker_based/server.py
@@ -142,12 +142,12 @@ class Server(object):
try:
reply_to = message.properties['reply_to']
except KeyError:
- LOG.warn("The 'reply_to' message property is missing"
- " in received notify message '%s'",
- ku.DelayedPretty(message), exc_info=True)
+ LOG.warning("The 'reply_to' message property is missing"
+ " in received notify message '%s'",
+ ku.DelayedPretty(message), exc_info=True)
else:
response = pr.Notify(topic=self._topic,
- tasks=self._endpoints.keys())
+ tasks=list(self._endpoints.keys()))
try:
self._proxy.publish(response, routing_key=reply_to)
except Exception:
@@ -177,8 +177,9 @@ class Server(object):
work = pr.Request.from_dict(request, task_uuid=task_uuid)
except ValueError:
with misc.capture_failure() as failure:
- LOG.warn("Failed to parse request contents from message '%s'",
- ku.DelayedPretty(message), exc_info=True)
+ LOG.warning("Failed to parse request contents"
+ " from message '%s'",
+ ku.DelayedPretty(message), exc_info=True)
reply_callback(result=pr.failure_to_dict(failure))
return
@@ -187,10 +188,10 @@ class Server(object):
endpoint = self._endpoints[work.task_cls]
except KeyError:
with misc.capture_failure() as failure:
- LOG.warn("The '%s' task endpoint does not exist, unable"
- " to continue processing request message '%s'",
- work.task_cls, ku.DelayedPretty(message),
- exc_info=True)
+ LOG.warning("The '%s' task endpoint does not exist, unable"
+ " to continue processing request message '%s'",
+ work.task_cls, ku.DelayedPretty(message),
+ exc_info=True)
reply_callback(result=pr.failure_to_dict(failure))
return
else:
@@ -198,10 +199,10 @@ class Server(object):
handler = getattr(endpoint, work.action)
except AttributeError:
with misc.capture_failure() as failure:
- LOG.warn("The '%s' handler does not exist on task endpoint"
- " '%s', unable to continue processing request"
- " message '%s'", work.action, endpoint,
- ku.DelayedPretty(message), exc_info=True)
+ LOG.warning("The '%s' handler does not exist on task"
+ " endpoint '%s', unable to continue processing"
+ " request message '%s'", work.action, endpoint,
+ ku.DelayedPretty(message), exc_info=True)
reply_callback(result=pr.failure_to_dict(failure))
return
else:
@@ -209,9 +210,10 @@ class Server(object):
task = endpoint.generate(name=work.task_name)
except Exception:
with misc.capture_failure() as failure:
- LOG.warn("The '%s' task '%s' generation for request"
- " message '%s' failed", endpoint, work.action,
- ku.DelayedPretty(message), exc_info=True)
+ LOG.warning("The '%s' task '%s' generation for request"
+ " message '%s' failed", endpoint,
+ work.action, ku.DelayedPretty(message),
+ exc_info=True)
reply_callback(result=pr.failure_to_dict(failure))
return
else:
@@ -237,9 +239,9 @@ class Server(object):
result = handler(task, **work.arguments)
except Exception:
with misc.capture_failure() as failure:
- LOG.warn("The '%s' endpoint '%s' execution for request"
- " message '%s' failed", endpoint, work.action,
- ku.DelayedPretty(message), exc_info=True)
+ LOG.warning("The '%s' endpoint '%s' execution for request"
+ " message '%s' failed", endpoint, work.action,
+ ku.DelayedPretty(message), exc_info=True)
reply_callback(result=pr.failure_to_dict(failure))
else:
# And be done with it!
diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py
index cd141e2..a0ebe00 100644
--- a/taskflow/jobs/backends/impl_redis.py
+++ b/taskflow/jobs/backends/impl_redis.py
@@ -817,9 +817,9 @@ return cmsgpack.pack(result)
job_details = job_data.get('details', {})
except (ValueError, TypeError, KeyError):
with excutils.save_and_reraise_exception():
- LOG.warn("Incorrectly formatted job data found at"
- " key: %s[%s]", self.listings_key,
- raw_job_key, exc_info=True)
+ LOG.warning("Incorrectly formatted job data found at"
+ " key: %s[%s]", self.listings_key,
+ raw_job_key, exc_info=True)
else:
postings.append(RedisJob(self, job_name, job_sequence_id,
raw_job_key, uuid=job_uuid,
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 3c83d8a..158e235 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -425,23 +425,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
job_name = job_data['name']
except (ValueError, TypeError, KeyError):
with excutils.save_and_reraise_exception(reraise=not quiet):
- LOG.warn("Incorrectly formatted job data found at path: %s",
- path, exc_info=True)
+ LOG.warning("Incorrectly formatted job data found at path: %s",
+ path, exc_info=True)
except self._client.handler.timeout_exception:
with excutils.save_and_reraise_exception(reraise=not quiet):
- LOG.warn("Operation timed out fetching job data from path: %s",
- path, exc_info=True)
+ LOG.warning("Operation timed out fetching job data from"
+ " from path: %s",
+ path, exc_info=True)
except k_exceptions.SessionExpiredError:
with excutils.save_and_reraise_exception(reraise=not quiet):
- LOG.warn("Session expired fetching job data from path: %s",
- path, exc_info=True)
+ LOG.warning("Session expired fetching job data from path: %s",
+ path, exc_info=True)
except k_exceptions.NoNodeError:
LOG.debug("No job node found at path: %s, it must have"
" disappeared or was removed", path)
except k_exceptions.KazooException:
with excutils.save_and_reraise_exception(reraise=not quiet):
- LOG.warn("Internal error fetching job data from path: %s",
- path, exc_info=True)
+ LOG.warning("Internal error fetching job data from path: %s",
+ path, exc_info=True)
else:
with self._job_cond:
# Now we can officially check if someone already placed this
@@ -747,9 +748,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._last_states.appendleft(state)
if state == k_states.KazooState.LOST:
self._connected = False
- LOG.warn("Connection to zookeeper has been lost")
+ LOG.warning("Connection to zookeeper has been lost")
elif state == k_states.KazooState.SUSPENDED:
- LOG.warn("Connection to zookeeper has been suspended")
+ LOG.warning("Connection to zookeeper has been suspended")
self._suspended = True
else:
# Must be CONNECTED then (as there are only 3 enums)
diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py
index e6ebbff..4d2edf5 100644
--- a/taskflow/listeners/base.py
+++ b/taskflow/listeners/base.py
@@ -159,8 +159,8 @@ class Listener(object):
self.deregister()
except Exception:
# Don't let deregistering throw exceptions
- LOG.warn("Failed deregistering listeners from engine %s",
- self._engine, exc_info=True)
+ LOG.warning("Failed deregistering listeners from engine %s",
+ self._engine, exc_info=True)
@six.add_metaclass(abc.ABCMeta)
diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py
index 82b89cf..ae8aabf 100644
--- a/taskflow/listeners/claims.py
+++ b/taskflow/listeners/claims.py
@@ -70,9 +70,9 @@ class CheckingClaimListener(base.Listener):
try:
engine.suspend()
except exceptions.TaskFlowException as e:
- LOG.warn("Failed suspending engine '%s', (previously owned by"
- " '%s'):%s%s", engine, self._owner, os.linesep,
- e.pformat())
+ LOG.warning("Failed suspending engine '%s', (previously owned by"
+ " '%s'):%s%s", engine, self._owner, os.linesep,
+ e.pformat())
def _flow_receiver(self, state, details):
self._claim_checker(state, details)
@@ -97,6 +97,7 @@ class CheckingClaimListener(base.Listener):
LOG.debug("Job '%s' is still claimed (actively owned by '%s')",
self._job, self._owner)
else:
- LOG.warn("Job '%s' has lost its claim (previously owned by '%s')",
- self._job, self._owner)
+ LOG.warning("Job '%s' has lost its claim"
+ " (previously owned by '%s')",
+ self._job, self._owner)
self._on_job_loss(self._engine, state, details)
diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py
index 7dba798..d8a7312 100644
--- a/taskflow/listeners/timing.py
+++ b/taskflow/listeners/timing.py
@@ -63,8 +63,9 @@ class DurationListener(base.Listener):
for item_type, timers in six.iteritems(self._timers):
leftover_timers = len(timers)
if leftover_timers:
- LOG.warn("%s %s(s) did not enter %s states", leftover_timers,
- item_type, FINISHED_STATES)
+ LOG.warning("%s %s(s) did not enter %s states",
+ leftover_timers,
+ item_type, FINISHED_STATES)
timers.clear()
def _record_ending(self, timer, item_type, item_name, state):
@@ -80,8 +81,8 @@ class DurationListener(base.Listener):
else:
storage.update_atom_metadata(item_name, meta_update)
except exc.StorageFailure:
- LOG.warn("Failure to store duration update %s for %s %s",
- meta_update, item_type, item_name, exc_info=True)
+ LOG.warning("Failure to store duration update %s for %s %s",
+ meta_update, item_type, item_name, exc_info=True)
def _task_receiver(self, state, details):
task_name = details['task_name']
@@ -151,8 +152,8 @@ class EventTimeListener(base.Listener):
# Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_atom_metadata(atom_name, meta_update)
except exc.StorageFailure:
- LOG.warn("Failure to store timestamp %s for atom %s",
- meta_update, atom_name, exc_info=True)
+ LOG.warning("Failure to store timestamp %s for atom %s",
+ meta_update, atom_name, exc_info=True)
def _flow_receiver(self, state, details):
meta_update = {'%s-timestamp' % state: time.time()}
@@ -160,8 +161,8 @@ class EventTimeListener(base.Listener):
# Don't let storage failures throw exceptions in a listener method.
self._engine.storage.update_flow_metadata(meta_update)
except exc.StorageFailure:
- LOG.warn("Failure to store timestamp %s for flow %s",
- meta_update, details['flow_name'], exc_info=True)
+ LOG.warning("Failure to store timestamp %s for flow %s",
+ meta_update, details['flow_name'], exc_info=True)
def _task_receiver(self, state, details):
self._record_atom_event(state, details['task_name'])
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index 70dbc16..19b3b20 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -176,10 +176,10 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if _in_any(six.text_type(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS):
- LOG.warn('Got mysql server has gone away', exc_info=True)
+ LOG.warning('Got mysql server has gone away', exc_info=True)
raise sa_exc.DisconnectionError("Database server went away")
elif _in_any(six.text_type(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS):
- LOG.warn('Got postgres server has gone away', exc_info=True)
+ LOG.warning('Got postgres server has gone away', exc_info=True)
raise sa_exc.DisconnectionError("Database server went away")
else:
raise
@@ -362,7 +362,7 @@ class Connection(base.Connection):
"""Performs basic **connection** validation of a sqlalchemy engine."""
def _retry_on_exception(exc):
- LOG.warn("Engine connection (validate) failed due to '%s'", exc)
+ LOG.warning("Engine connection (validate) failed due to '%s'", exc)
if isinstance(exc, sa_exc.OperationalError) and \
_is_db_connection_error(six.text_type(exc.args[0])):
# We may be able to fix this by retrying...
diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py
index c428beb..be2cf97 100644
--- a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py
+++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py
@@ -132,12 +132,12 @@ def upgrade():
for fkey_descriptor in _get_foreign_keys():
op.create_foreign_key(**fkey_descriptor)
except NotImplementedError as e:
- LOG.warn("Foreign keys are not supported: %s", e)
+ LOG.warning("Foreign keys are not supported: %s", e)
try:
for index_descriptor in _get_indexes():
op.create_index(**index_descriptor)
except NotImplementedError as e:
- LOG.warn("Indexes are not supported: %s", e)
+ LOG.warning("Indexes are not supported: %s", e)
def downgrade():
diff --git a/taskflow/task.py b/taskflow/task.py
index ba4fb32..3ff282d 100644
--- a/taskflow/task.py
+++ b/taskflow/task.py
@@ -103,8 +103,9 @@ class Task(atom.Atom):
:param progress: task progress float value between 0.0 and 1.0
"""
def on_clamped():
- LOG.warn("Progress value must be greater or equal to 0.0 or less"
- " than or equal to 1.0 instead of being '%s'", progress)
+ LOG.warning("Progress value must be greater or equal to 0.0 or"
+ " less than or equal to 1.0 instead of being '%s'",
+ progress)
cleaned_progress = misc.clamp(progress, 0.0, 1.0,
on_clamped=on_clamped)
self._notifier.notify(EVENT_UPDATE_PROGRESS,
diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py
index 3d0f5f5..3e2f880 100644
--- a/taskflow/tests/unit/test_listeners.py
+++ b/taskflow/tests/unit/test_listeners.py
@@ -242,8 +242,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
self.assertIn('duration', fd.meta)
self.assertGreaterEqual(0.1, fd.meta['duration'])
- @mock.patch.object(timing.LOG, 'warn')
- def test_record_ending_exception(self, mocked_warn):
+ @mock.patch.object(timing.LOG, 'warning')
+ def test_record_ending_exception(self, mocked_warning):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
@@ -255,8 +255,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin):
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with duration_listener:
e.run()
- mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'task',
- 'test-1', exc_info=True)
+ mocked_warning.assert_called_once_with(mock.ANY, mock.ANY, 'task',
+ 'test-1', exc_info=True)
class TestEventTimeListener(test.TestCase, EngineMakerMixin):
diff --git a/taskflow/tests/unit/test_task.py b/taskflow/tests/unit/test_task.py
index b85514b..f008ab5 100644
--- a/taskflow/tests/unit/test_task.py
+++ b/taskflow/tests/unit/test_task.py
@@ -263,8 +263,8 @@ class TaskTest(test.TestCase):
a_task.execute(values)
self.assertEqual(values, result)
- @mock.patch.object(task.LOG, 'warn')
- def test_update_progress_lower_bound(self, mocked_warn):
+ @mock.patch.object(task.LOG, 'warning')
+ def test_update_progress_lower_bound(self, mocked_warning):
result = []
def progress_callback(event_type, details):
@@ -274,10 +274,10 @@ class TaskTest(test.TestCase):
a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback)
a_task.execute([-1.0, -0.5, 0.0])
self.assertEqual([0.0, 0.0, 0.0], result)
- self.assertEqual(2, mocked_warn.call_count)
+ self.assertEqual(2, mocked_warning.call_count)
- @mock.patch.object(task.LOG, 'warn')
- def test_update_progress_upper_bound(self, mocked_warn):
+ @mock.patch.object(task.LOG, 'warning')
+ def test_update_progress_upper_bound(self, mocked_warning):
result = []
def progress_callback(event_type, details):
@@ -287,10 +287,10 @@ class TaskTest(test.TestCase):
a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback)
a_task.execute([1.0, 1.5, 2.0])
self.assertEqual([1.0, 1.0, 1.0], result)
- self.assertEqual(2, mocked_warn.call_count)
+ self.assertEqual(2, mocked_warning.call_count)
- @mock.patch.object(notifier.LOG, 'warn')
- def test_update_progress_handler_failure(self, mocked_warn):
+ @mock.patch.object(notifier.LOG, 'warning')
+ def test_update_progress_handler_failure(self, mocked_warning):
def progress_callback(*args, **kwargs):
raise Exception('Woot!')
@@ -298,7 +298,7 @@ class TaskTest(test.TestCase):
a_task = ProgressTask()
a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback)
a_task.execute([0.5])
- self.assertEqual(1, mocked_warn.call_count)
+ self.assertEqual(1, mocked_warning.call_count)
def test_register_handler_is_none(self):
a_task = MyTask()
diff --git a/taskflow/types/notifier.py b/taskflow/types/notifier.py
index c91cea1..7c6af37 100644
--- a/taskflow/types/notifier.py
+++ b/taskflow/types/notifier.py
@@ -212,9 +212,9 @@ class Notifier(object):
try:
listener(event_type, details.copy())
except Exception:
- LOG.warn("Failure calling listener %s to notify about event"
- " %s, details: %s", listener, event_type,
- details, exc_info=True)
+ LOG.warning("Failure calling listener %s to notify about event"
+ " %s, details: %s", listener, event_type,
+ details, exc_info=True)
def register(self, event_type, callback,
args=None, kwargs=None, details_filter=None):
diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py
index acdf4c1..1117ebb 100644
--- a/taskflow/utils/persistence_utils.py
+++ b/taskflow/utils/persistence_utils.py
@@ -78,7 +78,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None):
flow_id = uuidutils.generate_uuid()
flow_name = getattr(flow, 'name', None)
if flow_name is None:
- LOG.warn("No name provided for flow %s (id %s)", flow, flow_id)
+ LOG.warning("No name provided for flow %s (id %s)", flow, flow_id)
flow_name = flow_id
flow_detail = models.FlowDetail(name=flow_name, uuid=flow_id)
@@ -88,7 +88,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None):
flow_detail.meta.update(meta)
if backend is not None and book is None:
- LOG.warn("No logbook provided for flow %s, creating one.", flow)
+ LOG.warning("No logbook provided for flow %s, creating one.", flow)
book = temporary_log_book(backend)
if book is not None: