7 #include <packager/app/job_manager.h>
11 #include <absl/log/check.h>
13 #include <packager/media/chunking/sync_point_queue.h>
14 #include <packager/media/origin/origin_handler.h>
19 Job::Job(
const std::string& name,
20 std::shared_ptr<OriginHandler> work,
21 OnCompleteFunction on_complete)
23 work_(std::move(work)),
24 on_complete_(on_complete),
25 status_(error::Code::UNKNOWN,
"Job uninitialized") {
29 const Status& Job::Initialize() {
30 status_ = work_->Initialize();
35 thread_.reset(
new std::thread(&Job::Run,
this));
42 const Status& Job::Run() {
44 status_ = work_->Run();
58 JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
59 : sync_points_(std::move(sync_points)) {}
61 void JobManager::Add(
const std::string& name,
62 std::shared_ptr<OriginHandler> handler) {
63 jobs_.emplace_back(
new Job(
64 name, std::move(handler),
65 std::bind(&JobManager::OnJobComplete,
this, std::placeholders::_1)));
68 Status JobManager::InitializeJobs() {
70 for (
auto& job : jobs_)
71 status.Update(job->Initialize());
75 Status JobManager::RunJobs() {
76 std::set<Job*> active_jobs;
80 for (
auto& job : jobs_) {
83 active_jobs.insert(job.get());
89 absl::MutexLock lock(&mutex_);
90 while (status.ok() && active_jobs.size()) {
92 any_job_complete_.Wait(&mutex_);
95 for (
const auto& entry : complete_) {
96 Job* job = entry.first;
97 bool complete = entry.second;
100 status.Update(job->status());
101 active_jobs.erase(job);
110 sync_points_->Cancel();
112 for (
auto& job : active_jobs)
115 for (
auto& job : active_jobs)
121 void JobManager::OnJobComplete(Job* job) {
122 absl::MutexLock lock(&mutex_);
124 complete_[job] =
true;
125 any_job_complete_.Signal();
128 void JobManager::CancelJobs() {
130 sync_points_->Cancel();
132 for (
auto& job : jobs_)
All the methods that are virtual are virtual for mocking.