Shaka Packager SDK
thread_pool.cc
1 // Copyright 2022 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/file/thread_pool.h>
8 
9 #include <thread>
10 
11 #include <absl/log/check.h>
12 #include <absl/log/log.h>
13 #include <absl/time/time.h>
14 
15 namespace shaka {
16 
17 namespace {
18 
19 const absl::Duration kMaxThreadIdleTime = absl::Minutes(10);
20 
21 } // namespace
22 
23 // static
24 ThreadPool ThreadPool::instance;
25 
26 ThreadPool::ThreadPool() : num_idle_threads_(0), terminated_(false) {}
27 
28 ThreadPool::~ThreadPool() {
29  Terminate();
30 }
31 
32 void ThreadPool::PostTask(const std::function<void()>& task) {
33  absl::MutexLock lock(&mutex_);
34 
35  DCHECK(!terminated_) << "Should not call PostTask after Terminate!";
36 
37  if (terminated_) {
38  return;
39  }
40 
41  // An empty task is used internally to signal the thread to terminate. This
42  // should never be sent on input.
43  if (!task) {
44  DLOG(ERROR) << "Should not post an empty task!";
45  return;
46  }
47 
48  tasks_.push(std::move(task));
49 
50  if (num_idle_threads_ >= tasks_.size()) {
51  // We have enough threads available.
52  tasks_available_.SignalAll();
53  } else {
54  // We need to start an additional thread.
55  std::thread thread(std::bind(&ThreadPool::ThreadMain, this));
56  thread.detach();
57  }
58 }
59 
60 void ThreadPool::Terminate() {
61  {
62  absl::MutexLock lock(&mutex_);
63  terminated_ = true;
64  while (!tasks_.empty()) {
65  tasks_.pop();
66  }
67  }
68  tasks_available_.SignalAll();
69 }
70 
71 ThreadPool::Task ThreadPool::WaitForTask() {
72  absl::MutexLock lock(&mutex_);
73  if (terminated_) {
74  // The pool is terminated. Terminate this thread.
75  return Task();
76  }
77 
78  if (tasks_.empty()) {
79  num_idle_threads_++;
80  // Wait for a task, up to the maximum idle time.
81  tasks_available_.WaitWithTimeout(&mutex_, kMaxThreadIdleTime);
82  num_idle_threads_--;
83 
84  if (tasks_.empty()) {
85  // No work before the timeout. Terminate this thread.
86  return Task();
87  }
88  }
89 
90  // Get the next task from the queue.
91  Task task = tasks_.front();
92  tasks_.pop();
93  return task;
94 }
95 
96 void ThreadPool::ThreadMain() {
97  while (true) {
98  auto task = WaitForTask();
99  if (!task) {
100  // An empty task signals the thread to terminate.
101  return;
102  }
103 
104  // Run the task, then loop to wait for another.
105  task();
106  }
107 }
108 
109 } // namespace shaka
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66