Shaka Packager SDK
Loading...
Searching...
No Matches
job_manager.cc
1// Copyright 2017 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/app/job_manager.h>
8
9#include <set>
10
11#include <absl/log/check.h>
12
13#include <packager/media/chunking/sync_point_queue.h>
14#include <packager/media/origin/origin_handler.h>
15
16namespace shaka {
17namespace media {
18
19Job::Job(const std::string& name,
20 std::shared_ptr<OriginHandler> work,
21 OnCompleteFunction on_complete)
22 : name_(name),
23 work_(std::move(work)),
24 on_complete_(on_complete),
25 status_(error::Code::UNKNOWN, "Job uninitialized") {
26 DCHECK(work_);
27}
28
29const Status& Job::Initialize() {
30 status_ = work_->Initialize();
31 return status_;
32}
33
34void Job::Start() {
35 thread_.reset(new std::thread(&Job::Run, this));
36}
37
38void Job::Cancel() {
39 work_->Cancel();
40}
41
42const Status& Job::Run() {
43 if (status_.ok()) // initialized correctly
44 status_ = work_->Run();
45
46 on_complete_(this);
47
48 return status_;
49}
50
51void Job::Join() {
52 if (thread_) {
53 thread_->join();
54 thread_ = nullptr;
55 }
56}
57
58JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
59 : sync_points_(std::move(sync_points)) {}
60
61void JobManager::Add(const std::string& name,
62 std::shared_ptr<OriginHandler> handler) {
63 jobs_.emplace_back(new Job(
64 name, std::move(handler),
65 std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1)));
66}
67
68Status JobManager::InitializeJobs() {
69 Status status;
70 for (auto& job : jobs_)
71 status.Update(job->Initialize());
72 return status;
73}
74
75Status JobManager::RunJobs() {
76 std::set<Job*> active_jobs;
77
78 // Start every job and add it to the active jobs list so that we can wait
79 // on each one.
80 for (auto& job : jobs_) {
81 job->Start();
82
83 active_jobs.insert(job.get());
84 }
85
86 // Wait for all jobs to complete or any job to error.
87 Status status;
88 {
89 absl::MutexLock lock(&mutex_);
90 while (status.ok() && active_jobs.size()) {
91 // any_job_complete_ is protected by mutex_.
92 any_job_complete_.Wait(&mutex_);
93
94 // complete_ is protected by mutex_.
95 for (const auto& entry : complete_) {
96 Job* job = entry.first;
97 bool complete = entry.second;
98 if (complete) {
99 job->Join();
100 status.Update(job->status());
101 active_jobs.erase(job);
102 }
103 }
104 }
105 }
106
107 // If the main loop has exited and there are still jobs running,
108 // we need to cancel them and clean-up.
109 if (sync_points_)
110 sync_points_->Cancel();
111
112 for (auto& job : active_jobs)
113 job->Cancel();
114
115 for (auto& job : active_jobs)
116 job->Join();
117
118 return status;
119}
120
121void JobManager::OnJobComplete(Job* job) {
122 absl::MutexLock lock(&mutex_);
123 // These are both protected by mutex_.
124 complete_[job] = true;
125 any_job_complete_.Signal();
126}
127
128void JobManager::CancelJobs() {
129 if (sync_points_)
130 sync_points_->Cancel();
131
132 for (auto& job : jobs_)
133 job->Cancel();
134}
135
136} // namespace media
137} // namespace shaka
All the methods that are virtual are virtual for mocking.