Shaka Player Embedded
task_runner.h
Go to the documentation of this file.
1 // Copyright 2016 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef SHAKA_EMBEDDED_CORE_TASK_RUNNER_H_
16 #define SHAKA_EMBEDDED_CORE_TASK_RUNNER_H_
17 
18 #include <glog/logging.h>
19 
20 #include <atomic>
21 #include <functional>
22 #include <list>
23 #include <memory>
24 #include <string>
25 #include <type_traits>
26 #include <utility>
27 
28 #include "src/core/ref_ptr.h"
29 #include "src/debug/mutex.h"
30 #include "src/debug/thread.h"
31 #include "src/debug/thread_event.h"
32 #include "src/util/clock.h"
33 #include "src/util/utils.h"
34 
35 namespace shaka {
36 
37 enum class TaskPriority {
38  Timer,
39  Internal,
40  Events,
41  Immediate,
42 };
43 
44 namespace impl {
45 
46 template <typename Func>
48 
51  public:
52  PendingTaskBase(const util::Clock* clock, TaskPriority priority,
53  uint64_t delay_ms, int id, bool loop);
54  virtual ~PendingTaskBase();
55 
57  virtual void Call() = 0;
58 
59  uint64_t start_ms;
60  const uint64_t delay_ms;
62  const int id;
63  const bool loop;
64  // Using an atomic ensures that writes from another thread are flushed to
65  // the other threads immediately.
66  std::atomic<bool> should_remove;
67 
68  private:
69  PendingTaskBase(const PendingTaskBase&) = delete;
70  PendingTaskBase(PendingTaskBase&&) = delete;
71  PendingTaskBase& operator=(const PendingTaskBase&) = delete;
72  PendingTaskBase& operator=(PendingTaskBase&&) = delete;
73 };
74 
81 template <typename Func>
82 class PendingTask : public PendingTaskBase {
83  public:
84  static_assert(!std::is_base_of<memory::Traceable,
85  typename std::decay<Func>::type>::value,
86  "Cannot pass Traceable objects to TaskRunner");
88 
89  PendingTask(const util::Clock* clock, Func&& callback,
90  const std::string& name, TaskPriority priority, uint64_t delay_ms,
91  int id, bool loop)
92  : PendingTaskBase(clock, priority, delay_ms, id, loop),
93  callback(std::forward<Func>(callback)),
94  event(new ThreadEvent<Ret>(name)) {}
95 
96  void Call() override {
97  // If this were C++17, we could use if-constexpr:
98  //
99  // if constexpr (std::is_same<Ret, void>::value) {
100  // callback();
101  // event->SignalAllIfNotSet();
102  // } else {
103  // event->SignalAllIfNotSet(callback());
104  // }
105 
106  SetHelper<Func, Ret>::Set(&callback, &event);
107  }
108 
110  std::shared_ptr<ThreadEvent<Ret>> event;
111 
112  private:
113  template <typename F, typename R>
114  struct SetHelper {
115  static void Set(typename std::decay<F>::type* callback,
116  std::shared_ptr<ThreadEvent<R>>* event) {
117  (*event)->SignalAllIfNotSet((*callback)());
118  }
119  };
120  template <typename F>
121  struct SetHelper<F, void> {
122  static void Set(typename std::decay<F>::type* callback,
123  std::shared_ptr<ThreadEvent<void>>* event) {
124  (*callback)();
125  (*event)->SignalAllIfNotSet();
126  }
127  };
128 };
129 
130 template <typename T>
132  template <typename Func>
133  static void CallAndResolve(Func&& callback, std::promise<T>* promise) {
134  promise->set_value(callback());
135  }
136 };
137 template <>
138 struct FutureResolver<void> {
139  template <typename Func>
140  static void CallAndResolve(Func&& callback, std::promise<void>* promise) {
141  callback();
142  promise->set_value();
143  }
144 };
145 
146 } // namespace impl
147 
148 
154 class TaskRunner {
155  public:
156  using RunLoop = std::function<void()>;
157 
158  TaskRunner(std::function<void(RunLoop)> wrapper, const util::Clock* clock,
159  bool is_worker);
160  ~TaskRunner();
161 
163  bool is_running() const {
164  return running_;
165  }
166 
168  bool HasPendingWork() const;
169 
171  bool BelongsToCurrentThread() const;
172 
173 
178  void Stop();
179 
181  void WaitUntilFinished();
182 
183 
191  template <typename Func>
192  std::shared_future<impl::RetOf<Func>> InvokeOrSchedule(Func&& callback) {
193  using Ret = impl::RetOf<Func>;
194  if (BelongsToCurrentThread()) {
195  std::promise<Ret> promise;
196  impl::FutureResolver<Ret>::CallAndResolve(std::forward<Func>(callback),
197  &promise);
198  return promise.get_future().share();
199  } else {
200  return AddInternalTask(TaskPriority::Internal, "",
201  std::forward<Func>(callback))
202  ->future();
203  }
204  }
205 
218  template <typename Func>
219  std::shared_ptr<ThreadEvent<impl::RetOf<Func>>> AddInternalTask(
220  TaskPriority priority, const std::string& name, Func&& callback) {
221  DCHECK(priority != TaskPriority::Timer) << "Use AddTimer for timers";
222 
223  std::unique_lock<Mutex> lock(mutex_);
224  const int id = ++next_id_;
225  auto pending_task =
226  new impl::PendingTask<Func>(clock_, std::forward<Func>(callback), name,
227  priority, 0, id, /* loop */ false);
228  tasks_.emplace_back(pending_task);
229  pending_task->event->SetProvider(&worker_);
230 
231  return pending_task->event;
232  }
233 
242  template <typename Func>
243  int AddTimer(uint64_t delay_ms, Func&& callback) {
244  std::unique_lock<Mutex> lock(mutex_);
245  const int id = ++next_id_;
246 
247  tasks_.emplace_back(
248  new impl::PendingTask<Func>(clock_, std::forward<Func>(callback), "",
249  TaskPriority::Timer, delay_ms, id,
250  /* loop= */ false));
251 
252  return id;
253  }
254 
266  template <typename Func>
267  int AddRepeatedTimer(uint64_t delay_ms, Func&& callback) {
268  std::unique_lock<Mutex> lock(mutex_);
269  const int id = ++next_id_;
270 
271  tasks_.emplace_back(
272  new impl::PendingTask<Func>(clock_, std::forward<Func>(callback), "",
273  TaskPriority::Timer, delay_ms, id,
274  /* loop= */ true));
275 
276  return id;
277  }
278 
280  void CancelTimer(int id);
281 
282  private:
283  TaskRunner(const TaskRunner&) = delete;
284  TaskRunner(TaskRunner&&) = delete;
285  TaskRunner& operator=(const TaskRunner&) = delete;
286  TaskRunner& operator=(TaskRunner&&) = delete;
287 
292  void Run(std::function<void(RunLoop)> wrapper);
293 
298  void OnIdle();
299 
304  bool HandleTask();
305 
306 
307  // TODO: Consider a different data structure.
308  std::list<std::unique_ptr<impl::PendingTaskBase>> tasks_;
309 
310  mutable Mutex mutex_;
311  const util::Clock* clock_;
312  ThreadEvent<void> waiting_;
313  std::atomic<bool> running_;
314  int next_id_;
315  bool is_worker_;
316 
317  Thread worker_;
318 };
319 
320 } // namespace shaka
321 
322 #endif // SHAKA_EMBEDDED_CORE_TASK_RUNNER_H_
const TaskPriority priority
Definition: task_runner.h:61
static void CallAndResolve(Func &&callback, std::promise< T > *promise)
Definition: task_runner.h:133
PendingTask(const util::Clock *clock, Func &&callback, const std::string &name, TaskPriority priority, uint64_t delay_ms, int id, bool loop)
Definition: task_runner.h:89
const char * name
static void CallAndResolve(Func &&callback, std::promise< void > *promise)
Definition: task_runner.h:140
void Call() override
Definition: task_runner.h:96
bool is_running() const
Definition: task_runner.h:163
std::function< void()> RunLoop
Definition: task_runner.h:156
typename std::result_of< Func()>::type Ret
Definition: task_runner.h:87
ExceptionCode type
std::shared_future< impl::RetOf< Func > > InvokeOrSchedule(Func &&callback)
Definition: task_runner.h:192
typename std::result_of< Func()>::type RetOf
Definition: task_runner.h:47
std::decay< Func >::type callback
Definition: task_runner.h:109
int AddTimer(uint64_t delay_ms, Func &&callback)
Definition: task_runner.h:243
int AddRepeatedTimer(uint64_t delay_ms, Func &&callback)
Definition: task_runner.h:267
std::shared_ptr< ThreadEvent< impl::RetOf< Func > > > AddInternalTask(TaskPriority priority, const std::string &name, Func &&callback)
Definition: task_runner.h:219
std::atomic< bool > should_remove
Definition: task_runner.h:66
TaskPriority
Definition: task_runner.h:37
std::shared_ptr< ThreadEvent< Ret > > event
Definition: task_runner.h:110