diff options
-rw-r--r-- | taskflow/engines/action_engine/process_executor.py | 8 | ||||
-rw-r--r-- | taskflow/engines/worker_based/dispatcher.py | 6 | ||||
-rw-r--r-- | taskflow/engines/worker_based/proxy.py | 6 | ||||
-rw-r--r-- | taskflow/engines/worker_based/server.py | 42 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_redis.py | 6 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 21 | ||||
-rw-r--r-- | taskflow/listeners/base.py | 4 | ||||
-rw-r--r-- | taskflow/listeners/claims.py | 11 | ||||
-rw-r--r-- | taskflow/listeners/timing.py | 17 | ||||
-rw-r--r-- | taskflow/persistence/backends/impl_sqlalchemy.py | 6 | ||||
-rw-r--r-- | taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py | 4 | ||||
-rw-r--r-- | taskflow/task.py | 5 | ||||
-rw-r--r-- | taskflow/tests/unit/test_listeners.py | 8 | ||||
-rw-r--r-- | taskflow/tests/unit/test_task.py | 18 | ||||
-rw-r--r-- | taskflow/types/notifier.py | 6 | ||||
-rw-r--r-- | taskflow/utils/persistence_utils.py | 4 |
16 files changed, 89 insertions, 83 deletions
diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py index e59852f..85d37e0 100644 --- a/taskflow/engines/action_engine/process_executor.py +++ b/taskflow/engines/action_engine/process_executor.py @@ -504,16 +504,16 @@ class DispatcherHandler(asyncore.dispatcher): try: self.reader.feed(data) except (IOError, UnknownSender): - LOG.warn("Invalid received message", exc_info=True) + LOG.warning("Invalid received message", exc_info=True) self.handle_close() except _DECODE_ENCODE_ERRORS: - LOG.warn("Badly formatted message", exc_info=True) + LOG.warning("Badly formatted message", exc_info=True) self.handle_close() except (ValueError, su.ValidationError): - LOG.warn("Failed validating message", exc_info=True) + LOG.warning("Failed validating message", exc_info=True) self.handle_close() except ChallengeIgnored: - LOG.warn("Failed challenge sequence", exc_info=True) + LOG.warning("Failed challenge sequence", exc_info=True) self.handle_close() diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 2735073..c6e5d25 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -135,9 +135,9 @@ class TypeDispatcher(object): except excp.InvalidFormat as e: message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) - LOG.warn("Message '%s' (%s) was rejected due to it being" - " in an invalid format: %s", - ku.DelayedPretty(message), message_type, e) + LOG.warning("Message '%s' (%s) was rejected due to it" + " being in an invalid format: %s", + ku.DelayedPretty(message), message_type, e) return message.ack_log_error(logger=LOG, errors=(kombu_exc.MessageStateError,)) diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 386e377..e58c7a2 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -162,9 +162,9 @@ class Proxy(object): # Filter out any empty keys... routing_keys = [r_k for r_k in routing_keys if r_k] if not routing_keys: - LOG.warn("No routing key/s specified; unable to send '%s'" - " to any target queue on exchange '%s'", msg, - self._exchange_name) + LOG.warning("No routing key/s specified; unable to send '%s'" + " to any target queue on exchange '%s'", msg, + self._exchange_name) return def _publish(producer, routing_key): diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 978ab3a..fe51831 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -142,12 +142,12 @@ class Server(object): try: reply_to = message.properties['reply_to'] except KeyError: - LOG.warn("The 'reply_to' message property is missing" - " in received notify message '%s'", - ku.DelayedPretty(message), exc_info=True) + LOG.warning("The 'reply_to' message property is missing" + " in received notify message '%s'", + ku.DelayedPretty(message), exc_info=True) else: response = pr.Notify(topic=self._topic, - tasks=self._endpoints.keys()) + tasks=list(self._endpoints.keys())) try: self._proxy.publish(response, routing_key=reply_to) except Exception: @@ -177,8 +177,9 @@ class Server(object): work = pr.Request.from_dict(request, task_uuid=task_uuid) except ValueError: with misc.capture_failure() as failure: - LOG.warn("Failed to parse request contents from message '%s'", - ku.DelayedPretty(message), exc_info=True) + LOG.warning("Failed to parse request contents" + " from message '%s'", + ku.DelayedPretty(message), exc_info=True) reply_callback(result=pr.failure_to_dict(failure)) return @@ -187,10 +188,10 @@ class Server(object): endpoint = self._endpoints[work.task_cls] except KeyError: with misc.capture_failure() as failure: - LOG.warn("The '%s' task endpoint does not exist, unable" - " to continue processing request message '%s'", - work.task_cls, ku.DelayedPretty(message), - exc_info=True) + LOG.warning("The '%s' task endpoint does not exist, unable" + " to continue processing request message '%s'", + work.task_cls, ku.DelayedPretty(message), + exc_info=True) reply_callback(result=pr.failure_to_dict(failure)) return else: @@ -198,10 +199,10 @@ class Server(object): handler = getattr(endpoint, work.action) except AttributeError: with misc.capture_failure() as failure: - LOG.warn("The '%s' handler does not exist on task endpoint" - " '%s', unable to continue processing request" - " message '%s'", work.action, endpoint, - ku.DelayedPretty(message), exc_info=True) + LOG.warning("The '%s' handler does not exist on task" + " endpoint '%s', unable to continue processing" + " request message '%s'", work.action, endpoint, + ku.DelayedPretty(message), exc_info=True) reply_callback(result=pr.failure_to_dict(failure)) return else: @@ -209,9 +210,10 @@ class Server(object): task = endpoint.generate(name=work.task_name) except Exception: with misc.capture_failure() as failure: - LOG.warn("The '%s' task '%s' generation for request" - " message '%s' failed", endpoint, work.action, - ku.DelayedPretty(message), exc_info=True) + LOG.warning("The '%s' task '%s' generation for request" + " message '%s' failed", endpoint, + work.action, ku.DelayedPretty(message), + exc_info=True) reply_callback(result=pr.failure_to_dict(failure)) return else: @@ -237,9 +239,9 @@ class Server(object): result = handler(task, **work.arguments) except Exception: with misc.capture_failure() as failure: - LOG.warn("The '%s' endpoint '%s' execution for request" - " message '%s' failed", endpoint, work.action, - ku.DelayedPretty(message), exc_info=True) + LOG.warning("The '%s' endpoint '%s' execution for request" + " message '%s' failed", endpoint, work.action, + ku.DelayedPretty(message), exc_info=True) reply_callback(result=pr.failure_to_dict(failure)) else: # And be done with it! diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index cd141e2..a0ebe00 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -817,9 +817,9 @@ return cmsgpack.pack(result) job_details = job_data.get('details', {}) except (ValueError, TypeError, KeyError): with excutils.save_and_reraise_exception(): - LOG.warn("Incorrectly formatted job data found at" - " key: %s[%s]", self.listings_key, - raw_job_key, exc_info=True) + LOG.warning("Incorrectly formatted job data found at" + " key: %s[%s]", self.listings_key, + raw_job_key, exc_info=True) else: postings.append(RedisJob(self, job_name, job_sequence_id, raw_job_key, uuid=job_uuid, diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 3c83d8a..158e235 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -425,23 +425,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): job_name = job_data['name'] except (ValueError, TypeError, KeyError): with excutils.save_and_reraise_exception(reraise=not quiet): - LOG.warn("Incorrectly formatted job data found at path: %s", - path, exc_info=True) + LOG.warning("Incorrectly formatted job data found at path: %s", + path, exc_info=True) except self._client.handler.timeout_exception: with excutils.save_and_reraise_exception(reraise=not quiet): - LOG.warn("Operation timed out fetching job data from path: %s", - path, exc_info=True) + LOG.warning("Operation timed out fetching job data from" + " from path: %s", + path, exc_info=True) except k_exceptions.SessionExpiredError: with excutils.save_and_reraise_exception(reraise=not quiet): - LOG.warn("Session expired fetching job data from path: %s", - path, exc_info=True) + LOG.warning("Session expired fetching job data from path: %s", + path, exc_info=True) except k_exceptions.NoNodeError: LOG.debug("No job node found at path: %s, it must have" " disappeared or was removed", path) except k_exceptions.KazooException: with excutils.save_and_reraise_exception(reraise=not quiet): - LOG.warn("Internal error fetching job data from path: %s", - path, exc_info=True) + LOG.warning("Internal error fetching job data from path: %s", + path, exc_info=True) else: with self._job_cond: # Now we can officially check if someone already placed this @@ -747,9 +748,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._last_states.appendleft(state) if state == k_states.KazooState.LOST: self._connected = False - LOG.warn("Connection to zookeeper has been lost") + LOG.warning("Connection to zookeeper has been lost") elif state == k_states.KazooState.SUSPENDED: - LOG.warn("Connection to zookeeper has been suspended") + LOG.warning("Connection to zookeeper has been suspended") self._suspended = True else: # Must be CONNECTED then (as there are only 3 enums) diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index e6ebbff..4d2edf5 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -159,8 +159,8 @@ class Listener(object): self.deregister() except Exception: # Don't let deregistering throw exceptions - LOG.warn("Failed deregistering listeners from engine %s", - self._engine, exc_info=True) + LOG.warning("Failed deregistering listeners from engine %s", + self._engine, exc_info=True) @six.add_metaclass(abc.ABCMeta) diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py index 82b89cf..ae8aabf 100644 --- a/taskflow/listeners/claims.py +++ b/taskflow/listeners/claims.py @@ -70,9 +70,9 @@ class CheckingClaimListener(base.Listener): try: engine.suspend() except exceptions.TaskFlowException as e: - LOG.warn("Failed suspending engine '%s', (previously owned by" - " '%s'):%s%s", engine, self._owner, os.linesep, - e.pformat()) + LOG.warning("Failed suspending engine '%s', (previously owned by" + " '%s'):%s%s", engine, self._owner, os.linesep, + e.pformat()) def _flow_receiver(self, state, details): self._claim_checker(state, details) @@ -97,6 +97,7 @@ class CheckingClaimListener(base.Listener): LOG.debug("Job '%s' is still claimed (actively owned by '%s')", self._job, self._owner) else: - LOG.warn("Job '%s' has lost its claim (previously owned by '%s')", - self._job, self._owner) + LOG.warning("Job '%s' has lost its claim" + " (previously owned by '%s')", + self._job, self._owner) self._on_job_loss(self._engine, state, details) diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index 7dba798..d8a7312 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -63,8 +63,9 @@ class DurationListener(base.Listener): for item_type, timers in six.iteritems(self._timers): leftover_timers = len(timers) if leftover_timers: - LOG.warn("%s %s(s) did not enter %s states", leftover_timers, - item_type, FINISHED_STATES) + LOG.warning("%s %s(s) did not enter %s states", + leftover_timers, + item_type, FINISHED_STATES) timers.clear() def _record_ending(self, timer, item_type, item_name, state): @@ -80,8 +81,8 @@ class DurationListener(base.Listener): else: storage.update_atom_metadata(item_name, meta_update) except exc.StorageFailure: - LOG.warn("Failure to store duration update %s for %s %s", - meta_update, item_type, item_name, exc_info=True) + LOG.warning("Failure to store duration update %s for %s %s", + meta_update, item_type, item_name, exc_info=True) def _task_receiver(self, state, details): task_name = details['task_name'] @@ -151,8 +152,8 @@ class EventTimeListener(base.Listener): # Don't let storage failures throw exceptions in a listener method. self._engine.storage.update_atom_metadata(atom_name, meta_update) except exc.StorageFailure: - LOG.warn("Failure to store timestamp %s for atom %s", - meta_update, atom_name, exc_info=True) + LOG.warning("Failure to store timestamp %s for atom %s", + meta_update, atom_name, exc_info=True) def _flow_receiver(self, state, details): meta_update = {'%s-timestamp' % state: time.time()} @@ -160,8 +161,8 @@ class EventTimeListener(base.Listener): # Don't let storage failures throw exceptions in a listener method. self._engine.storage.update_flow_metadata(meta_update) except exc.StorageFailure: - LOG.warn("Failure to store timestamp %s for flow %s", - meta_update, details['flow_name'], exc_info=True) + LOG.warning("Failure to store timestamp %s for flow %s", + meta_update, details['flow_name'], exc_info=True) def _task_receiver(self, state, details): self._record_atom_event(state, details['task_name']) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 70dbc16..19b3b20 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -176,10 +176,10 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy): dbapi_conn.cursor().execute('select 1') except dbapi_conn.OperationalError as ex: if _in_any(six.text_type(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS): - LOG.warn('Got mysql server has gone away', exc_info=True) + LOG.warning('Got mysql server has gone away', exc_info=True) raise sa_exc.DisconnectionError("Database server went away") elif _in_any(six.text_type(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS): - LOG.warn('Got postgres server has gone away', exc_info=True) + LOG.warning('Got postgres server has gone away', exc_info=True) raise sa_exc.DisconnectionError("Database server went away") else: raise @@ -362,7 +362,7 @@ class Connection(base.Connection): """Performs basic **connection** validation of a sqlalchemy engine.""" def _retry_on_exception(exc): - LOG.warn("Engine connection (validate) failed due to '%s'", exc) + LOG.warning("Engine connection (validate) failed due to '%s'", exc) if isinstance(exc, sa_exc.OperationalError) and \ _is_db_connection_error(six.text_type(exc.args[0])): # We may be able to fix this by retrying... diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py index c428beb..be2cf97 100644 --- a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1cea328f0f65_initial_logbook_deta.py @@ -132,12 +132,12 @@ def upgrade(): for fkey_descriptor in _get_foreign_keys(): op.create_foreign_key(**fkey_descriptor) except NotImplementedError as e: - LOG.warn("Foreign keys are not supported: %s", e) + LOG.warning("Foreign keys are not supported: %s", e) try: for index_descriptor in _get_indexes(): op.create_index(**index_descriptor) except NotImplementedError as e: - LOG.warn("Indexes are not supported: %s", e) + LOG.warning("Indexes are not supported: %s", e) def downgrade(): diff --git a/taskflow/task.py b/taskflow/task.py index ba4fb32..3ff282d 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -103,8 +103,9 @@ class Task(atom.Atom): :param progress: task progress float value between 0.0 and 1.0 """ def on_clamped(): - LOG.warn("Progress value must be greater or equal to 0.0 or less" - " than or equal to 1.0 instead of being '%s'", progress) + LOG.warning("Progress value must be greater or equal to 0.0 or" + " less than or equal to 1.0 instead of being '%s'", + progress) cleaned_progress = misc.clamp(progress, 0.0, 1.0, on_clamped=on_clamped) self._notifier.notify(EVENT_UPDATE_PROGRESS, diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 3d0f5f5..3e2f880 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -242,8 +242,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin): self.assertIn('duration', fd.meta) self.assertGreaterEqual(0.1, fd.meta['duration']) - @mock.patch.object(timing.LOG, 'warn') - def test_record_ending_exception(self, mocked_warn): + @mock.patch.object(timing.LOG, 'warning') + def test_record_ending_exception(self, mocked_warning): with contextlib.closing(impl_memory.MemoryBackend()) as be: flow = lf.Flow("test") flow.add(test_utils.TaskNoRequiresNoReturns("test-1")) @@ -255,8 +255,8 @@ class TestDurationListener(test.TestCase, EngineMakerMixin): mocked_uam.side_effect = exc.StorageFailure('Woot!') with duration_listener: e.run() - mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'task', - 'test-1', exc_info=True) + mocked_warning.assert_called_once_with(mock.ANY, mock.ANY, 'task', + 'test-1', exc_info=True) class TestEventTimeListener(test.TestCase, EngineMakerMixin): diff --git a/taskflow/tests/unit/test_task.py b/taskflow/tests/unit/test_task.py index b85514b..f008ab5 100644 --- a/taskflow/tests/unit/test_task.py +++ b/taskflow/tests/unit/test_task.py @@ -263,8 +263,8 @@ class TaskTest(test.TestCase): a_task.execute(values) self.assertEqual(values, result) - @mock.patch.object(task.LOG, 'warn') - def test_update_progress_lower_bound(self, mocked_warn): + @mock.patch.object(task.LOG, 'warning') + def test_update_progress_lower_bound(self, mocked_warning): result = [] def progress_callback(event_type, details): @@ -274,10 +274,10 @@ class TaskTest(test.TestCase): a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback) a_task.execute([-1.0, -0.5, 0.0]) self.assertEqual([0.0, 0.0, 0.0], result) - self.assertEqual(2, mocked_warn.call_count) + self.assertEqual(2, mocked_warning.call_count) - @mock.patch.object(task.LOG, 'warn') - def test_update_progress_upper_bound(self, mocked_warn): + @mock.patch.object(task.LOG, 'warning') + def test_update_progress_upper_bound(self, mocked_warning): result = [] def progress_callback(event_type, details): @@ -287,10 +287,10 @@ class TaskTest(test.TestCase): a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback) a_task.execute([1.0, 1.5, 2.0]) self.assertEqual([1.0, 1.0, 1.0], result) - self.assertEqual(2, mocked_warn.call_count) + self.assertEqual(2, mocked_warning.call_count) - @mock.patch.object(notifier.LOG, 'warn') - def test_update_progress_handler_failure(self, mocked_warn): + @mock.patch.object(notifier.LOG, 'warning') + def test_update_progress_handler_failure(self, mocked_warning): def progress_callback(*args, **kwargs): raise Exception('Woot!') @@ -298,7 +298,7 @@ class TaskTest(test.TestCase): a_task = ProgressTask() a_task.notifier.register(task.EVENT_UPDATE_PROGRESS, progress_callback) a_task.execute([0.5]) - self.assertEqual(1, mocked_warn.call_count) + self.assertEqual(1, mocked_warning.call_count) def test_register_handler_is_none(self): a_task = MyTask() diff --git a/taskflow/types/notifier.py b/taskflow/types/notifier.py index c91cea1..7c6af37 100644 --- a/taskflow/types/notifier.py +++ b/taskflow/types/notifier.py @@ -212,9 +212,9 @@ class Notifier(object): try: listener(event_type, details.copy()) except Exception: - LOG.warn("Failure calling listener %s to notify about event" - " %s, details: %s", listener, event_type, - details, exc_info=True) + LOG.warning("Failure calling listener %s to notify about event" + " %s, details: %s", listener, event_type, + details, exc_info=True) def register(self, event_type, callback, args=None, kwargs=None, details_filter=None): diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index acdf4c1..1117ebb 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -78,7 +78,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): flow_id = uuidutils.generate_uuid() flow_name = getattr(flow, 'name', None) if flow_name is None: - LOG.warn("No name provided for flow %s (id %s)", flow, flow_id) + LOG.warning("No name provided for flow %s (id %s)", flow, flow_id) flow_name = flow_id flow_detail = models.FlowDetail(name=flow_name, uuid=flow_id) @@ -88,7 +88,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): flow_detail.meta.update(meta) if backend is not None and book is None: - LOG.warn("No logbook provided for flow %s, creating one.", flow) + LOG.warning("No logbook provided for flow %s, creating one.", flow) book = temporary_log_book(backend) if book is not None: |