summaryrefslogtreecommitdiff
path: root/chromium/base/task/thread_pool/sequence.cc
blob: b37b5796a4657e272f7ba5c33f59e6b0977998b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/thread_pool/sequence.h"

#include <utility>

#include "base/bind.h"
#include "base/check.h"
#include "base/critical_closure.h"
#include "base/feature_list.h"
#include "base/memory/ptr_util.h"
#include "base/task/task_features.h"
#include "base/time/time.h"

namespace base {
namespace internal {

Sequence::Transaction::Transaction(Sequence* sequence)
    : TaskSource::Transaction(sequence) {}

Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;

Sequence::Transaction::~Transaction() = default;

bool Sequence::Transaction::WillPushTask() const {
  // If the sequence is empty before a Task is inserted into it and the pool is
  // not running any task from this sequence, it should be queued.
  // Otherwise, one of these must be true:
  // - The Sequence is already queued, or,
  // - A thread is running a Task from the Sequence. It is expected to reenqueue
  //   the Sequence once it's done running the Task.
  return sequence()->queue_.empty() && !sequence()->has_worker_;
}

void Sequence::Transaction::PushTask(Task task) {
  // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
  // for details.
  CHECK(task.task);
  DCHECK(task.queue_time.is_null());

  bool should_be_queued = WillPushTask();
  task.queue_time = TimeTicks::Now();

  task.task = sequence()->traits_.shutdown_behavior() ==
                      TaskShutdownBehavior::BLOCK_SHUTDOWN
                  ? MakeCriticalClosure(task.posted_from, std::move(task.task))
                  : std::move(task.task);

  if (sequence()->queue_.empty())
    sequence()->ready_time_.store(task.queue_time, std::memory_order_relaxed);
  sequence()->queue_.push(std::move(task));

  // AddRef() matched by manual Release() when the sequence has no more tasks
  // to run (in DidProcessTask() or Clear()).
  if (should_be_queued && sequence()->task_runner())
    sequence()->task_runner()->AddRef();
}

TaskSource::RunStatus Sequence::WillRunTask() {
  // There should never be a second call to WillRunTask() before DidProcessTask
  // since the RunStatus is always marked a saturated.
  DCHECK(!has_worker_);

  // It's ok to access |has_worker_| outside of a Transaction since
  // WillRunTask() is externally synchronized, always called in sequence with
  // TakeTask() and DidProcessTask() and only called if |!queue_.empty()|, which
  // means it won't race with WillPushTask()/PushTask().
  has_worker_ = true;
  return RunStatus::kAllowedSaturated;
}

size_t Sequence::GetRemainingConcurrency() const {
  return 1;
}

Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
  CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);

  DCHECK(has_worker_);
  DCHECK(!queue_.empty());
  DCHECK(queue_.front().task);

  auto next_task = std::move(queue_.front());
  queue_.pop();
  if (!queue_.empty()) {
    ready_time_.store(queue_.front().queue_time, std::memory_order_relaxed);
  }
  return next_task;
}

bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
  CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
  // There should never be a call to DidProcessTask without an associated
  // WillRunTask().
  DCHECK(has_worker_);
  has_worker_ = false;
  // See comment on TaskSource::task_runner_ for lifetime management details.
  if (queue_.empty()) {
    ReleaseTaskRunner();
    return false;
  }
  // Let the caller re-enqueue this non-empty Sequence regardless of
  // |run_result| so it can continue churning through this Sequence's tasks and
  // skip/delete them in the proper scope.
  return true;
}

TaskSourceSortKey Sequence::GetSortKey(
    bool /* disable_fair_scheduling */) const {
  return TaskSourceSortKey(priority_racy(),
                           ready_time_.load(std::memory_order_relaxed));
}

Task Sequence::Clear(TaskSource::Transaction* transaction) {
  CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
  // See comment on TaskSource::task_runner_ for lifetime management details.
  if (!queue_.empty() && !has_worker_)
    ReleaseTaskRunner();
  return Task(FROM_HERE,
              base::BindOnce(
                  [](base::queue<Task> queue) {
                    while (!queue.empty())
                      queue.pop();
                  },
                  std::move(queue_)),
              TimeDelta());
}

void Sequence::ReleaseTaskRunner() {
  if (!task_runner())
    return;
  if (execution_mode() == TaskSourceExecutionMode::kParallel) {
    static_cast<PooledParallelTaskRunner*>(task_runner())
        ->UnregisterSequence(this);
  }
  // No member access after this point, releasing |task_runner()| might delete
  // |this|.
  task_runner()->Release();
}

Sequence::Sequence(const TaskTraits& traits,
                   TaskRunner* task_runner,
                   TaskSourceExecutionMode execution_mode)
    : TaskSource(traits, task_runner, execution_mode) {}

Sequence::~Sequence() = default;

Sequence::Transaction Sequence::BeginTransaction() {
  return Transaction(this);
}

ExecutionEnvironment Sequence::GetExecutionEnvironment() {
  return {token_, &sequence_local_storage_};
}

}  // namespace internal
}  // namespace base