Shaka Packager SDK
job_manager.cc
1 // Copyright 2017 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/app/job_manager.h>
8 
9 #include <set>
10 
11 #include <absl/log/check.h>
12 
13 #include <packager/media/chunking/sync_point_queue.h>
14 #include <packager/media/origin/origin_handler.h>
15 
16 namespace shaka {
17 namespace media {
18 
19 Job::Job(const std::string& name,
20  std::shared_ptr<OriginHandler> work,
21  OnCompleteFunction on_complete)
22  : name_(name),
23  work_(std::move(work)),
24  on_complete_(on_complete),
25  status_(error::Code::UNKNOWN, "Job uninitialized") {
26  DCHECK(work_);
27 }
28 
29 const Status& Job::Initialize() {
30  status_ = work_->Initialize();
31  return status_;
32 }
33 
34 void Job::Start() {
35  thread_.reset(new std::thread(&Job::Run, this));
36 }
37 
38 void Job::Cancel() {
39  work_->Cancel();
40 }
41 
42 const Status& Job::Run() {
43  if (status_.ok()) // initialized correctly
44  status_ = work_->Run();
45 
46  on_complete_(this);
47 
48  return status_;
49 }
50 
51 void Job::Join() {
52  if (thread_) {
53  thread_->join();
54  thread_ = nullptr;
55  }
56 }
57 
58 JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
59  : sync_points_(std::move(sync_points)) {}
60 
61 void JobManager::Add(const std::string& name,
62  std::shared_ptr<OriginHandler> handler) {
63  jobs_.emplace_back(new Job(
64  name, std::move(handler),
65  std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1)));
66 }
67 
68 Status JobManager::InitializeJobs() {
69  Status status;
70  for (auto& job : jobs_)
71  status.Update(job->Initialize());
72  return status;
73 }
74 
75 Status JobManager::RunJobs() {
76  std::set<Job*> active_jobs;
77 
78  // Start every job and add it to the active jobs list so that we can wait
79  // on each one.
80  for (auto& job : jobs_) {
81  job->Start();
82 
83  active_jobs.insert(job.get());
84  }
85 
86  // Wait for all jobs to complete or any job to error.
87  Status status;
88  {
89  absl::MutexLock lock(&mutex_);
90  while (status.ok() && active_jobs.size()) {
91  // any_job_complete_ is protected by mutex_.
92  any_job_complete_.Wait(&mutex_);
93 
94  // complete_ is protected by mutex_.
95  for (const auto& entry : complete_) {
96  Job* job = entry.first;
97  bool complete = entry.second;
98  if (complete) {
99  job->Join();
100  status.Update(job->status());
101  active_jobs.erase(job);
102  }
103  }
104  }
105  }
106 
107  // If the main loop has exited and there are still jobs running,
108  // we need to cancel them and clean-up.
109  if (sync_points_)
110  sync_points_->Cancel();
111 
112  for (auto& job : active_jobs)
113  job->Cancel();
114 
115  for (auto& job : active_jobs)
116  job->Join();
117 
118  return status;
119 }
120 
121 void JobManager::OnJobComplete(Job* job) {
122  absl::MutexLock lock(&mutex_);
123  // These are both protected by mutex_.
124  complete_[job] = true;
125  any_job_complete_.Signal();
126 }
127 
128 void JobManager::CancelJobs() {
129  if (sync_points_)
130  sync_points_->Cancel();
131 
132  for (auto& job : jobs_)
133  job->Cancel();
134 }
135 
136 } // namespace media
137 } // namespace shaka
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66