Shaka Packager SDK
Loading...
Searching...
No Matches
sync_point_queue.cc
1// Copyright 2018 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/media/chunking/sync_point_queue.h>
8
9#include <algorithm>
10#include <limits>
11
12#include <absl/log/check.h>
13
14#include <packager/media/base/media_handler.h>
15
16namespace shaka {
17namespace media {
18
19SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params) {
20 for (const Cuepoint& point : params.cue_points) {
21 std::shared_ptr<CueEvent> event = std::make_shared<CueEvent>();
22 event->time_in_seconds = point.start_time_in_seconds;
23 unpromoted_[point.start_time_in_seconds] = std::move(event);
24 }
25}
26
28 absl::MutexLock lock(&mutex_);
29 thread_count_++;
30}
31
33 {
34 absl::MutexLock lock(&mutex_);
35 cancelled_ = true;
36 }
37 sync_condition_.SignalAll();
38}
39
40double SyncPointQueue::GetHint(double time_in_seconds) {
41 absl::MutexLock lock(&mutex_);
42
43 auto iter = promoted_.upper_bound(time_in_seconds);
44 if (iter != promoted_.end())
45 return iter->first;
46
47 iter = unpromoted_.upper_bound(time_in_seconds);
48 if (iter != unpromoted_.end())
49 return iter->first;
50
51 // Use MAX DOUBLE as the fall back so that we can force all streams to run
52 // out all their samples even when there are no cues.
53 return std::numeric_limits<double>::max();
54}
55
56std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
57 double hint_in_seconds) {
58 absl::MutexLock lock(&mutex_);
59 while (!cancelled_) {
60 // Find the promoted cue that would line up with our hint, which is the
61 // first cue that is not less than |hint_in_seconds|.
62 auto iter = promoted_.lower_bound(hint_in_seconds);
63 if (iter != promoted_.end()) {
64 return iter->second;
65 }
66
67 // Promote |hint_in_seconds| if everyone is waiting.
68 if (waiting_thread_count_ + 1 == thread_count_) {
69 std::shared_ptr<const CueEvent> cue = PromoteAtNoLocking(hint_in_seconds);
70 CHECK(cue);
71 return cue;
72 }
73
74 waiting_thread_count_++;
75 // This blocks until either a cue is promoted or all threads are blocked
76 // (in which case, the unpromoted cue at the hint will be self-promoted
77 // and returned - see section above). Spurious signal events are possible
78 // with most condition variable implementations, so if it returns, we go
79 // back and check if a cue is actually promoted or not.
80 sync_condition_.Wait(&mutex_);
81 waiting_thread_count_--;
82 }
83 return nullptr;
84}
85
86std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
87 double time_in_seconds) {
88 absl::MutexLock lock(&mutex_);
89 return PromoteAtNoLocking(time_in_seconds);
90}
91
92bool SyncPointQueue::HasMore(double hint_in_seconds) const {
93 return hint_in_seconds < std::numeric_limits<double>::max();
94}
95
96std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAtNoLocking(
97 double time_in_seconds) {
98 mutex_.AssertHeld();
99
100 // It is possible that |time_in_seconds| has been promoted.
101 auto iter = promoted_.find(time_in_seconds);
102 if (iter != promoted_.end())
103 return iter->second;
104
105 // Find the unpromoted cue that would work for the given time, which is the
106 // first cue that is not greater than |time_in_seconds|.
107 // So find the the first cue that is greater than |time_in_seconds| first and
108 // then get the previous one.
109 iter = unpromoted_.upper_bound(time_in_seconds);
110 // The first cue in |unpromoted_| should not be greater than
111 // |time_in_seconds|. It could happen only if it has been promoted at a
112 // different timestamp, which can only be the result of unaligned GOPs.
113 if (iter == unpromoted_.begin())
114 return nullptr;
115 auto prev_iter = std::prev(iter);
116 DCHECK(prev_iter != unpromoted_.end());
117
118 std::shared_ptr<CueEvent> cue = prev_iter->second;
119 cue->time_in_seconds = time_in_seconds;
120
121 promoted_[time_in_seconds] = cue;
122 // Remove all unpromoted cues up to the cue that was just promoted.
123 // User may provide multiple cue points at the same or similar timestamps. The
124 // extra unused cues are simply ignored.
125 unpromoted_.erase(unpromoted_.begin(), iter);
126
127 // Wake up other threads that may be waiting.
128 sync_condition_.SignalAll();
129 return cue;
130}
131
132} // namespace media
133} // namespace shaka
std::shared_ptr< const CueEvent > PromoteAt(double time_in_seconds)
std::shared_ptr< const CueEvent > GetNext(double hint_in_seconds)
bool HasMore(double hint_in_seconds) const
void Cancel()
Cancel the queue and unblock all threads.
double GetHint(double time_in_seconds)
All the methods that are virtual are virtual for mocking.