Shaka Packager SDK
Loading...
Searching...
No Matches
cue_alignment_handler.cc
1// Copyright 2018 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/media/chunking/cue_alignment_handler.h>
8
9#include <algorithm>
10
11#include <absl/log/check.h>
12
13#include <packager/macros/logging.h>
14#include <packager/macros/status.h>
15
16namespace shaka {
17namespace media {
18namespace {
19// The max number of samples that are allowed to be buffered before we shutdown
20// because there is likely a problem with the content or how the pipeline was
21// configured. This is about 20 seconds of buffer for audio with 48kHz.
22const size_t kMaxBufferSize = 1000;
23
24int64_t GetScaledTime(const StreamInfo& info, const StreamData& data) {
25 DCHECK(data.text_sample || data.media_sample);
26
27 if (data.text_sample) {
28 return data.text_sample->start_time();
29 }
30
31 if (info.stream_type() == kStreamText) {
32 // This class does not support splitting MediaSample at cue points, which is
33 // required for text stream. This class expects MediaSample to be converted
34 // to TextSample before passing to this class.
35 NOTIMPLEMENTED()
36 << "A text streams should use text samples, not media samples.";
37 }
38
39 if (info.stream_type() == kStreamAudio) {
40 // Return the mid-point for audio because if the portion of the sample
41 // after the cue point is bigger than the portion of the sample before
42 // the cue point, the sample is placed after the cue.
43 return data.media_sample->pts() + data.media_sample->duration() / 2;
44 }
45
46 DCHECK_EQ(info.stream_type(), kStreamVideo);
47 return data.media_sample->pts();
48}
49
50double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
51 const int64_t scaled_time = GetScaledTime(info, data);
52 const int32_t time_scale = info.time_scale();
53
54 return static_cast<double>(scaled_time) / time_scale;
55}
56
57double TextEndTimeInSeconds(const StreamInfo& info, const StreamData& data) {
58 DCHECK(data.text_sample);
59
60 const int64_t scaled_time = data.text_sample->EndTime();
61 const int32_t time_scale = info.time_scale();
62
63 return static_cast<double>(scaled_time) / time_scale;
64}
65
66Status GetNextCue(double hint,
67 SyncPointQueue* sync_points,
68 std::shared_ptr<const CueEvent>* out_cue) {
69 DCHECK(sync_points);
70 DCHECK(out_cue);
71
72 *out_cue = sync_points->GetNext(hint);
73
74 // |*out_cue| will only be null if the job was cancelled.
75 return *out_cue ? Status::OK
76 : Status(error::CANCELLED, "SyncPointQueue is cancelled.");
77}
78} // namespace
79
80CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
81 : sync_points_(sync_points) {}
82
83Status CueAlignmentHandler::InitializeInternal() {
84 sync_points_->AddThread();
85 stream_states_.resize(num_input_streams());
86
87 // Get the first hint for the stream. Use a negative hint so that if there is
88 // suppose to be a sync point at zero, we will still respect it.
89 hint_ = sync_points_->GetHint(-1);
90
91 return Status::OK;
92}
93
94Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
95 switch (data->stream_data_type) {
96 case StreamDataType::kStreamInfo:
97 return OnStreamInfo(std::move(data));
98 case StreamDataType::kTextSample:
99 case StreamDataType::kMediaSample:
100 return OnSample(std::move(data));
101 default:
102 VLOG(3) << "Dropping unsupported data type "
103 << static_cast<int>(data->stream_data_type);
104 return Status::OK;
105 }
106}
107
108Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
109 stream_states_[stream_index].to_be_flushed = true;
110
111 // We need to wait for all stream to flush before we can flush each stream.
112 // This allows cached buffers to be cleared and cues to be properly
113 // synchronized and set on all streams.
114 for (const StreamState& stream_state : stream_states_) {
115 if (!stream_state.to_be_flushed) {
116 return Status::OK;
117 }
118 }
119
120 // Do a once over all the streams to ensure that their states are as we expect
121 // them. Video and non-video streams have different allowances here. Video
122 // should absolutely have no cues or samples where as non-video streams may
123 // have cues or samples.
124 for (StreamState& stream : stream_states_) {
125 DCHECK(stream.to_be_flushed);
126
127 if (stream.info->stream_type() == kStreamVideo) {
128 DCHECK_EQ(stream.samples.size(), 0u)
129 << "Video streams should not store samples";
130 DCHECK_EQ(stream.cues.size(), 0u)
131 << "Video streams should not store cues";
132 }
133 }
134
135 // It is possible that we did not get all the cues. |hint_| will get updated
136 // when we call |UseNextSyncPoint|.
137 while (sync_points_->HasMore(hint_)) {
138 std::shared_ptr<const CueEvent> next_cue;
139 RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
140 RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
141 }
142
143 // Now that there are new cues, it may be possible to dispatch some of the
144 // samples that may be left waiting.
145 for (StreamState& stream : stream_states_) {
146 RETURN_IF_ERROR(RunThroughSamples(&stream));
147 DCHECK_EQ(stream.samples.size(), 0u);
148
149 // Ignore extra cues at the end, except for text, as they will result in
150 // empty DASH Representations, which is not spec compliant.
151 // For text, if the cue is before the max end time, it will still be
152 // dispatched as the text samples intercepted by the cue can be split into
153 // two at the cue point.
154 for (auto& cue : stream.cues) {
155 // |max_text_sample_end_time_seconds| is always 0 for non-text samples.
156 if (cue->cue_event->time_in_seconds <
157 stream.max_text_sample_end_time_seconds) {
158 RETURN_IF_ERROR(Dispatch(std::move(cue)));
159 } else {
160 VLOG(1) << "Ignore extra cue in stream " << cue->stream_index
161 << " with time " << cue->cue_event->time_in_seconds
162 << "s in the end.";
163 }
164 }
165 stream.cues.clear();
166 }
167
168 return FlushAllDownstreams();
169}
170
171Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
172 StreamState& stream_state = stream_states_[data->stream_index];
173 // Keep a copy of the stream info so that we can check type and check
174 // timescale.
175 stream_state.info = data->stream_info;
176
177 return Dispatch(std::move(data));
178}
179
180Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
181 DCHECK(sample);
182 DCHECK(sample->media_sample);
183
184 const size_t stream_index = sample->stream_index;
185 StreamState& stream = stream_states_[stream_index];
186
187 const double sample_time = TimeInSeconds(*stream.info, *sample);
188 const bool is_key_frame = sample->media_sample->is_key_frame();
189
190 if (is_key_frame && sample_time >= hint_) {
191 auto next_sync = sync_points_->PromoteAt(sample_time);
192
193 if (!next_sync) {
194 LOG(ERROR) << "Failed to promote sync point at " << sample_time
195 << ". This happens only if video streams are not GOP-aligned.";
196 return Status(error::INVALID_ARGUMENT,
197 "Streams are not properly GOP-aligned.");
198 }
199
200 RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
201 DCHECK_EQ(stream.cues.size(), 1u);
202 RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
203 stream.cues.pop_front();
204 }
205
206 return Dispatch(std::move(sample));
207}
208
209Status CueAlignmentHandler::OnNonVideoSample(
210 std::unique_ptr<StreamData> sample) {
211 DCHECK(sample);
212 DCHECK(sample->media_sample || sample->text_sample);
213
214 const size_t stream_index = sample->stream_index;
215 StreamState& stream_state = stream_states_[stream_index];
216
217 // Accept the sample. This will output it if it comes before the hint point or
218 // will cache it if it comes after the hint point.
219 RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
220
221 // If all the streams are waiting on a hint, it means that none has next sync
222 // point determined. It also means that there are no video streams and we need
223 // to wait for all streams to converge on a hint so that we can get the next
224 // sync point.
225 if (EveryoneWaitingAtHint()) {
226 std::shared_ptr<const CueEvent> next_sync;
227 RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
228 RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
229 }
230
231 return Status::OK;
232}
233
234Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
235 // There are two modes:
236 // 1. There is a video input.
237 // 2. There are no video inputs.
238 //
239 // When there is a video input, we rely on the video input get the next sync
240 // point and release all the samples.
241 //
242 // When there are no video inputs, we rely on the sync point queue to block
243 // us until there is a sync point.
244
245 const size_t stream_index = sample->stream_index;
246
247 if (sample->text_sample) {
248 StreamState& stream = stream_states_[stream_index];
249 stream.max_text_sample_end_time_seconds =
250 std::max(stream.max_text_sample_end_time_seconds,
251 TextEndTimeInSeconds(*stream.info, *sample));
252 }
253
254 const StreamType stream_type =
255 stream_states_[stream_index].info->stream_type();
256 const bool is_video = stream_type == kStreamVideo;
257
258 return is_video ? OnVideoSample(std::move(sample))
259 : OnNonVideoSample(std::move(sample));
260}
261
262Status CueAlignmentHandler::UseNewSyncPoint(
263 std::shared_ptr<const CueEvent> new_sync) {
264 hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
265 DCHECK_GT(hint_, new_sync->time_in_seconds);
266
267 for (size_t stream_index = 0; stream_index < stream_states_.size();
268 stream_index++) {
269 StreamState& stream = stream_states_[stream_index];
270 stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
271
272 RETURN_IF_ERROR(RunThroughSamples(&stream));
273 }
274
275 return Status::OK;
276}
277
278bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
279 for (const StreamState& stream_state : stream_states_) {
280 if (stream_state.samples.empty()) {
281 return false;
282 }
283 }
284 return true;
285}
286
287Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
288 StreamState* stream) {
289 DCHECK(sample);
290 DCHECK(sample->media_sample || sample->text_sample);
291 DCHECK(stream);
292
293 // Need to cache the stream index as we will lose the pointer when we add
294 // the sample to the queue.
295 const size_t stream_index = sample->stream_index;
296
297 stream->samples.push_back(std::move(sample));
298
299 if (stream->samples.size() > kMaxBufferSize) {
300 LOG(ERROR) << "Stream " << stream_index << " has buffered "
301 << stream->samples.size() << " when the max is "
302 << kMaxBufferSize;
303 return Status(error::INVALID_ARGUMENT,
304 "Streams are not properly multiplexed.");
305 }
306
307 return RunThroughSamples(stream);
308}
309
310Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
311 // Step through all our samples until we find where we can insert the cue.
312 // Think of this as a merge sort.
313 while (stream->cues.size() && stream->samples.size()) {
314 const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
315 const double sample_time =
316 TimeInSeconds(*stream->info, *stream->samples.front());
317
318 if (sample_time < cue_time) {
319 RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
320 stream->samples.pop_front();
321 } else {
322 RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
323 stream->cues.pop_front();
324 }
325 }
326
327 // If we still have samples, then it means that we sent out the cue and can
328 // now work up to the hint. So now send all samples that come before the hint
329 // downstream.
330 while (stream->samples.size() &&
331 TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
332 RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
333 stream->samples.pop_front();
334 }
335
336 return Status::OK;
337}
338} // namespace media
339} // namespace shaka
All the methods that are virtual are virtual for mocking.