Shaka Packager SDK
Loading...
Searching...
No Matches
thread_pool.cc
1// Copyright 2022 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/file/thread_pool.h>
8
9#include <thread>
10
11#include <absl/log/check.h>
12#include <absl/log/log.h>
13#include <absl/time/time.h>
14
15namespace shaka {
16
17namespace {
18
19const absl::Duration kMaxThreadIdleTime = absl::Minutes(10);
20
21} // namespace
22
23// static
24ThreadPool ThreadPool::instance;
25
26ThreadPool::ThreadPool() : num_idle_threads_(0), terminated_(false) {}
27
28ThreadPool::~ThreadPool() {
29 Terminate();
30}
31
32void ThreadPool::PostTask(const std::function<void()>& task) {
33 absl::MutexLock lock(&mutex_);
34
35 DCHECK(!terminated_) << "Should not call PostTask after Terminate!";
36
37 if (terminated_) {
38 return;
39 }
40
41 // An empty task is used internally to signal the thread to terminate. This
42 // should never be sent on input.
43 if (!task) {
44 DLOG(ERROR) << "Should not post an empty task!";
45 return;
46 }
47
48 tasks_.push(std::move(task));
49
50 if (num_idle_threads_ >= tasks_.size()) {
51 // We have enough threads available.
52 tasks_available_.SignalAll();
53 } else {
54 // We need to start an additional thread.
55 std::thread thread(std::bind(&ThreadPool::ThreadMain, this));
56 thread.detach();
57 }
58}
59
60void ThreadPool::Terminate() {
61 {
62 absl::MutexLock lock(&mutex_);
63 terminated_ = true;
64 while (!tasks_.empty()) {
65 tasks_.pop();
66 }
67 }
68 tasks_available_.SignalAll();
69}
70
71ThreadPool::Task ThreadPool::WaitForTask() {
72 absl::MutexLock lock(&mutex_);
73 if (terminated_) {
74 // The pool is terminated. Terminate this thread.
75 return Task();
76 }
77
78 if (tasks_.empty()) {
79 num_idle_threads_++;
80 // Wait for a task, up to the maximum idle time.
81 tasks_available_.WaitWithTimeout(&mutex_, kMaxThreadIdleTime);
82 num_idle_threads_--;
83
84 if (tasks_.empty()) {
85 // No work before the timeout. Terminate this thread.
86 return Task();
87 }
88 }
89
90 // Get the next task from the queue.
91 Task task = tasks_.front();
92 tasks_.pop();
93 return task;
94}
95
96void ThreadPool::ThreadMain() {
97 while (true) {
98 auto task = WaitForTask();
99 if (!task) {
100 // An empty task signals the thread to terminate.
101 return;
102 }
103
104 // Run the task, then loop to wait for another.
105 task();
106 }
107}
108
109} // namespace shaka
All the methods that are virtual are virtual for mocking.