Shaka Player Embedded
task_runner.cc
Go to the documentation of this file.
1 // Copyright 2016 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/task_runner.h"
16 
17 #include <glog/logging.h>
18 
19 #include <chrono>
20 #include <limits>
21 
23 
24 namespace shaka {
25 
26 namespace impl {
27 
29  TaskPriority priority, uint64_t delay_ms,
30  int id, bool loop)
31  : start_ms(clock->GetMonotonicTime()),
32  delay_ms(delay_ms),
33  priority(priority),
34  id(id),
35  loop(loop),
36  should_remove(false) {}
37 
39 
40 } // namespace impl
41 
42 TaskRunner::TaskRunner(std::function<void(RunLoop)> wrapper,
43  const util::Clock* clock, bool is_worker)
44  : mutex_(is_worker ? "TaskRunner worker" : "TaskRunner main"),
45  clock_(clock),
46  waiting_("TaskRunner wait until finished"),
47  running_(true),
48  next_id_(0),
49  is_worker_(is_worker),
50  worker_(is_worker ? "JS Worker" : "JS Main Thread",
51  std::bind(&TaskRunner::Run, this, std::move(wrapper))) {
52  waiting_.SetProvider(&worker_);
53 }
54 
56  Stop();
57 }
58 
60  std::unique_lock<Mutex> lock(mutex_);
61  for (auto& task : tasks_) {
62  if (!task->loop)
63  return true;
64  }
65  return false;
66 }
67 
69  return running_ && std::this_thread::get_id() == worker_.get_id();
70 }
71 
73  bool join = false;
74  {
75  std::unique_lock<Mutex> lock(mutex_);
76  if (running_) {
77  running_ = false;
78  join = true;
79  waiting_.SignalAllIfNotSet();
80  }
81  }
82  if (join) {
83  worker_.join();
84  }
85 }
86 
88  if (running_ && HasPendingWork()) {
89  std::unique_lock<Mutex> lock(mutex_);
90  waiting_.ResetAndWaitWhileUnlocked(lock);
91  }
92 }
93 
95  std::unique_lock<Mutex> lock(mutex_);
96  for (auto& task : tasks_) {
97  if (task->id == id) {
98  task->should_remove = true;
99  return;
100  }
101  }
102 }
103 
104 void TaskRunner::Run(std::function<void(RunLoop)> wrapper) {
105  wrapper([this]() {
106  while (running_) {
107  // Handle a task. This will only handle one task, then loop.
108  if (HandleTask())
109  continue;
110 
111  if (!HasPendingWork()) {
112  waiting_.SignalAllIfNotSet();
113  }
114 
115  // We don't have any work to do, wait for a while.
116  OnIdle();
117  }
118 
119  // If we stop early, delete any pending tasks. This must be done on the
120  // worker thread so we can delete JavaScript objects.
121  tasks_.clear();
122  waiting_.SignalAllIfNotSet();
123  });
124 }
125 
126 void TaskRunner::OnIdle() {
127  // TODO: Since we have no work, we will never add work ourselves. Consider
128  // using signalling rather than polling.
129  clock_->SleepSeconds(0.001);
130 }
131 
132 bool TaskRunner::HandleTask() {
133  // We need to be careful here because:
134  // 1) We may be called from another thread to change tasks.
135  // 2) The callback may change tasks (including its own).
136 
137  const uint64_t now = clock_->GetMonotonicTime();
138  impl::PendingTaskBase* task = nullptr;
139  {
140  // Find the earliest timer we can finish. If there are multiple with the
141  // same time, pick the one registered earlier (lower ID).
142  std::unique_lock<Mutex> lock(mutex_);
143  uint64_t min_time = std::numeric_limits<uint64_t>::max();
144  TaskPriority max_priority = TaskPriority::Timer;
145  for (auto it = tasks_.begin(); it != tasks_.end();) {
146  if ((*it)->should_remove) {
147  it = tasks_.erase(it);
148  } else {
149  if ((*it)->priority > max_priority) {
150  max_priority = (*it)->priority;
151  task = it->get();
152  } else if (max_priority == TaskPriority::Timer) {
153  const uint64_t it_time = (*it)->start_ms + (*it)->delay_ms;
154  if (it_time <= now && it_time < min_time) {
155  min_time = it_time;
156  task = it->get();
157  }
158  }
159  ++it;
160  }
161  }
162  }
163 
164  if (!task)
165  return false;
166 
167 #ifdef USING_V8
168  if (!is_worker_) {
169  // V8 attaches v8::Local<T> instances to the most recent v8::HandleScope
170  // instance. By having a scope here, the task can create local handles and
171  // they will be freed after the task finishes.
172  v8::HandleScope handles(GetIsolate());
173  task->Call();
174  } else {
175  task->Call();
176  }
177 #else
178  // Other JavaScript engines use ref-counting for local handles.
179  task->Call();
180  (void)is_worker_;
181 #endif
182 
183  if (task->loop) {
184  task->start_ms = now;
185  } else {
186  task->should_remove = true;
187  // Will be removed in the next iteration.
188  }
189  return true;
190 }
191 
192 } // namespace shaka
PendingTaskBase(const util::Clock *clock, TaskPriority priority, uint64_t delay_ms, int id, bool loop)
Definition: task_runner.cc:28
void join()
Definition: thread.h:56
std::function< void()> RunLoop
Definition: task_runner.h:156
void WaitUntilFinished()
Definition: task_runner.cc:87
TaskRunner(std::function< void(RunLoop)> wrapper, const util::Clock *clock, bool is_worker)
Definition: task_runner.cc:42
void CancelTimer(int id)
Definition: task_runner.cc:94
T ResetAndWaitWhileUnlocked(std::unique_lock< _Mutex > &lock)
Definition: thread_event.h:106
std::thread::id get_id() const
Definition: thread.h:42
virtual void SleepSeconds(double seconds) const
Definition: clock.cc:39
std::atomic< bool > should_remove
Definition: task_runner.h:66
virtual uint64_t GetMonotonicTime() const
Definition: clock.cc:29
bool BelongsToCurrentThread() const
Definition: task_runner.cc:68
TaskPriority
Definition: task_runner.h:37
bool HasPendingWork() const
Definition: task_runner.cc:59
v8::Isolate * GetIsolate()
Definition: v8_utils.cc:27
void SetProvider(Thread *thread)
Definition: thread_event.h:49