Shaka Packager SDK
sync_point_queue.cc
1 // Copyright 2018 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/media/chunking/sync_point_queue.h>
8 
9 #include <algorithm>
10 #include <limits>
11 
12 #include <absl/log/check.h>
13 
14 #include <packager/media/base/media_handler.h>
15 
16 namespace shaka {
17 namespace media {
18 
19 SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params) {
20  for (const Cuepoint& point : params.cue_points) {
21  std::shared_ptr<CueEvent> event = std::make_shared<CueEvent>();
22  event->time_in_seconds = point.start_time_in_seconds;
23  unpromoted_[point.start_time_in_seconds] = std::move(event);
24  }
25 }
26 
28  absl::MutexLock lock(&mutex_);
29  thread_count_++;
30 }
31 
33  {
34  absl::MutexLock lock(&mutex_);
35  cancelled_ = true;
36  }
37  sync_condition_.SignalAll();
38 }
39 
40 double SyncPointQueue::GetHint(double time_in_seconds) {
41  absl::MutexLock lock(&mutex_);
42 
43  auto iter = promoted_.upper_bound(time_in_seconds);
44  if (iter != promoted_.end())
45  return iter->first;
46 
47  iter = unpromoted_.upper_bound(time_in_seconds);
48  if (iter != unpromoted_.end())
49  return iter->first;
50 
51  // Use MAX DOUBLE as the fall back so that we can force all streams to run
52  // out all their samples even when there are no cues.
53  return std::numeric_limits<double>::max();
54 }
55 
56 std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
57  double hint_in_seconds) {
58  absl::MutexLock lock(&mutex_);
59  while (!cancelled_) {
60  // Find the promoted cue that would line up with our hint, which is the
61  // first cue that is not less than |hint_in_seconds|.
62  auto iter = promoted_.lower_bound(hint_in_seconds);
63  if (iter != promoted_.end()) {
64  return iter->second;
65  }
66 
67  // Promote |hint_in_seconds| if everyone is waiting.
68  if (waiting_thread_count_ + 1 == thread_count_) {
69  std::shared_ptr<const CueEvent> cue = PromoteAtNoLocking(hint_in_seconds);
70  CHECK(cue);
71  return cue;
72  }
73 
74  waiting_thread_count_++;
75  // This blocks until either a cue is promoted or all threads are blocked
76  // (in which case, the unpromoted cue at the hint will be self-promoted
77  // and returned - see section above). Spurious signal events are possible
78  // with most condition variable implementations, so if it returns, we go
79  // back and check if a cue is actually promoted or not.
80  sync_condition_.Wait(&mutex_);
81  waiting_thread_count_--;
82  }
83  return nullptr;
84 }
85 
86 std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
87  double time_in_seconds) {
88  absl::MutexLock lock(&mutex_);
89  return PromoteAtNoLocking(time_in_seconds);
90 }
91 
92 bool SyncPointQueue::HasMore(double hint_in_seconds) const {
93  return hint_in_seconds < std::numeric_limits<double>::max();
94 }
95 
96 std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAtNoLocking(
97  double time_in_seconds) {
98  mutex_.AssertHeld();
99 
100  // It is possible that |time_in_seconds| has been promoted.
101  auto iter = promoted_.find(time_in_seconds);
102  if (iter != promoted_.end())
103  return iter->second;
104 
105  // Find the unpromoted cue that would work for the given time, which is the
106  // first cue that is not greater than |time_in_seconds|.
107  // So find the the first cue that is greater than |time_in_seconds| first and
108  // then get the previous one.
109  iter = unpromoted_.upper_bound(time_in_seconds);
110  // The first cue in |unpromoted_| should not be greater than
111  // |time_in_seconds|. It could happen only if it has been promoted at a
112  // different timestamp, which can only be the result of unaligned GOPs.
113  if (iter == unpromoted_.begin())
114  return nullptr;
115  auto prev_iter = std::prev(iter);
116  DCHECK(prev_iter != unpromoted_.end());
117 
118  std::shared_ptr<CueEvent> cue = prev_iter->second;
119  cue->time_in_seconds = time_in_seconds;
120 
121  promoted_[time_in_seconds] = cue;
122  // Remove all unpromoted cues up to the cue that was just promoted.
123  // User may provide multiple cue points at the same or similar timestamps. The
124  // extra unused cues are simply ignored.
125  unpromoted_.erase(unpromoted_.begin(), iter);
126 
127  // Wake up other threads that may be waiting.
128  sync_condition_.SignalAll();
129  return cue;
130 }
131 
132 } // namespace media
133 } // namespace shaka
std::shared_ptr< const CueEvent > PromoteAt(double time_in_seconds)
std::shared_ptr< const CueEvent > GetNext(double hint_in_seconds)
bool HasMore(double hint_in_seconds) const
void Cancel()
Cancel the queue and unblock all threads.
double GetHint(double time_in_seconds)
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66