Shaka Packager SDK
job_manager.h
1 // Copyright 2017 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #ifndef PACKAGER_APP_JOB_MANAGER_H_
8 #define PACKAGER_APP_JOB_MANAGER_H_
9 
10 #include <functional>
11 #include <map>
12 #include <memory>
13 #include <thread>
14 #include <vector>
15 
16 #include <absl/synchronization/mutex.h>
17 
18 #include <packager/status.h>
19 
20 namespace shaka {
21 namespace media {
22 
23 class OriginHandler;
24 class SyncPointQueue;
25 
26 // A job is a single line of work that is expected to run in parallel with
27 // other jobs.
28 class Job {
29  public:
30  typedef std::function<void(Job*)> OnCompleteFunction;
31 
32  Job(const std::string& name,
33  std::shared_ptr<OriginHandler> work,
34  OnCompleteFunction on_complete);
35 
36  // Initialize the work object. Call before Start() or Run(). Updates status()
37  // and returns it for convenience.
38  const Status& Initialize();
39 
40  // Begin the job in a new thread. This is only a request and will not block.
41  // If you want to wait for the job to complete, use |complete|.
42  // Use either Start() for threaded operation or Run() for non-threaded
43  // operation. DO NOT USE BOTH!
44  void Start();
45 
46  // Run the job's work synchronously, blocking until complete. Updates status()
47  // and returns it for convenience.
48  // Use either Start() for threaded operation or Run() for non-threaded
49  // operation. DO NOT USE BOTH!
50  const Status& Run();
51 
52  // Request that the job stops executing. This is only a request and will not
53  // block. If you want to wait for the job to complete, use |complete|.
54  void Cancel();
55 
56  // Join the thread, if any was started. Blocks until the thread has stopped.
57  void Join();
58 
59  // Get the current status of the job. If the job failed to initialize or
60  // encountered an error during execution this will return the error.
61  const Status& status() const { return status_; }
62 
63  // The name given to this job in the constructor.
64  const std::string& name() const { return name_; }
65 
66  private:
67  Job(const Job&) = delete;
68  Job& operator=(const Job&) = delete;
69 
70  std::string name_;
71  std::shared_ptr<OriginHandler> work_;
72  OnCompleteFunction on_complete_;
73  std::unique_ptr<std::thread> thread_;
74  Status status_;
75 };
76 
77 // Similar to a thread pool, JobManager manages multiple jobs that are expected
78 // to run in parallel. It can be used to register, run, and stop a batch of
79 // jobs.
80 class JobManager {
81  public:
82  // @param sync_points is an optional SyncPointQueue used to synchronize and
83  // align cue points. JobManager cancels @a sync_points when any job
84  // fails or is cancelled. It can be NULL.
85  explicit JobManager(std::unique_ptr<SyncPointQueue> sync_points);
86 
87  virtual ~JobManager() = default;
88 
89  // Create a new job entry by specifying the origin handler at the top of the
90  // chain and a name for the thread. This will only register the job. To start
91  // the job, you need to call |RunJobs|.
92  void Add(const std::string& name, std::shared_ptr<OriginHandler> handler);
93 
94  // Initialize all registered jobs. If any job fails to initialize, this will
95  // return the error and it will not be safe to call |RunJobs| as not all jobs
96  // will be properly initialized.
97  Status InitializeJobs();
98 
99  // Run all registered jobs. Before calling this make sure that
100  // |InitializedJobs| returned |Status::OK|. This call is blocking and will
101  // block until all jobs exit.
102  virtual Status RunJobs();
103 
104  // Ask all jobs to stop running. This call is non-blocking and can be used to
105  // unblock a call to |RunJobs|.
106  void CancelJobs();
107 
108  SyncPointQueue* sync_points() { return sync_points_.get(); }
109 
110  protected:
111  JobManager(const JobManager&) = delete;
112  JobManager& operator=(const JobManager&) = delete;
113 
114  void OnJobComplete(Job* job);
115 
116  // Stored in JobManager so JobManager can cancel |sync_points| when any job
117  // fails or is cancelled.
118  std::unique_ptr<SyncPointQueue> sync_points_;
119 
120  std::vector<std::unique_ptr<Job>> jobs_;
121 
122  absl::Mutex mutex_;
123  std::map<Job*, bool> complete_ ABSL_GUARDED_BY(mutex_);
124  absl::CondVar any_job_complete_ ABSL_GUARDED_BY(mutex_);
125 };
126 
127 } // namespace media
128 } // namespace shaka
129 
130 #endif // PACKAGER_APP_JOB_MANAGER_H_
A synchronized queue for cue points.
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66