Shaka Packager SDK
cue_alignment_handler.cc
1 // Copyright 2018 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/media/chunking/cue_alignment_handler.h>
8 
9 #include <algorithm>
10 
11 #include <absl/log/check.h>
12 
13 #include <packager/macros/logging.h>
14 #include <packager/macros/status.h>
15 
16 namespace shaka {
17 namespace media {
18 namespace {
19 // The max number of samples that are allowed to be buffered before we shutdown
20 // because there is likely a problem with the content or how the pipeline was
21 // configured. This is about 20 seconds of buffer for audio with 48kHz.
22 const size_t kMaxBufferSize = 1000;
23 
24 int64_t GetScaledTime(const StreamInfo& info, const StreamData& data) {
25  DCHECK(data.text_sample || data.media_sample);
26 
27  if (data.text_sample) {
28  return data.text_sample->start_time();
29  }
30 
31  if (info.stream_type() == kStreamText) {
32  // This class does not support splitting MediaSample at cue points, which is
33  // required for text stream. This class expects MediaSample to be converted
34  // to TextSample before passing to this class.
35  NOTIMPLEMENTED()
36  << "A text streams should use text samples, not media samples.";
37  }
38 
39  if (info.stream_type() == kStreamAudio) {
40  // Return the mid-point for audio because if the portion of the sample
41  // after the cue point is bigger than the portion of the sample before
42  // the cue point, the sample is placed after the cue.
43  return data.media_sample->pts() + data.media_sample->duration() / 2;
44  }
45 
46  DCHECK_EQ(info.stream_type(), kStreamVideo);
47  return data.media_sample->pts();
48 }
49 
50 double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
51  const int64_t scaled_time = GetScaledTime(info, data);
52  const int32_t time_scale = info.time_scale();
53 
54  return static_cast<double>(scaled_time) / time_scale;
55 }
56 
57 double TextEndTimeInSeconds(const StreamInfo& info, const StreamData& data) {
58  DCHECK(data.text_sample);
59 
60  const int64_t scaled_time = data.text_sample->EndTime();
61  const int32_t time_scale = info.time_scale();
62 
63  return static_cast<double>(scaled_time) / time_scale;
64 }
65 
66 Status GetNextCue(double hint,
67  SyncPointQueue* sync_points,
68  std::shared_ptr<const CueEvent>* out_cue) {
69  DCHECK(sync_points);
70  DCHECK(out_cue);
71 
72  *out_cue = sync_points->GetNext(hint);
73 
74  // |*out_cue| will only be null if the job was cancelled.
75  return *out_cue ? Status::OK
76  : Status(error::CANCELLED, "SyncPointQueue is cancelled.");
77 }
78 } // namespace
79 
80 CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
81  : sync_points_(sync_points) {}
82 
83 Status CueAlignmentHandler::InitializeInternal() {
84  sync_points_->AddThread();
85  stream_states_.resize(num_input_streams());
86 
87  // Get the first hint for the stream. Use a negative hint so that if there is
88  // suppose to be a sync point at zero, we will still respect it.
89  hint_ = sync_points_->GetHint(-1);
90 
91  return Status::OK;
92 }
93 
94 Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
95  switch (data->stream_data_type) {
96  case StreamDataType::kStreamInfo:
97  return OnStreamInfo(std::move(data));
98  case StreamDataType::kTextSample:
99  case StreamDataType::kMediaSample:
100  return OnSample(std::move(data));
101  default:
102  VLOG(3) << "Dropping unsupported data type "
103  << static_cast<int>(data->stream_data_type);
104  return Status::OK;
105  }
106 }
107 
108 Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
109  stream_states_[stream_index].to_be_flushed = true;
110 
111  // We need to wait for all stream to flush before we can flush each stream.
112  // This allows cached buffers to be cleared and cues to be properly
113  // synchronized and set on all streams.
114  for (const StreamState& stream_state : stream_states_) {
115  if (!stream_state.to_be_flushed) {
116  return Status::OK;
117  }
118  }
119 
120  // Do a once over all the streams to ensure that their states are as we expect
121  // them. Video and non-video streams have different allowances here. Video
122  // should absolutely have no cues or samples where as non-video streams may
123  // have cues or samples.
124  for (StreamState& stream : stream_states_) {
125  DCHECK(stream.to_be_flushed);
126 
127  if (stream.info->stream_type() == kStreamVideo) {
128  DCHECK_EQ(stream.samples.size(), 0u)
129  << "Video streams should not store samples";
130  DCHECK_EQ(stream.cues.size(), 0u)
131  << "Video streams should not store cues";
132  }
133  }
134 
135  // It is possible that we did not get all the cues. |hint_| will get updated
136  // when we call |UseNextSyncPoint|.
137  while (sync_points_->HasMore(hint_)) {
138  std::shared_ptr<const CueEvent> next_cue;
139  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
140  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
141  }
142 
143  // Now that there are new cues, it may be possible to dispatch some of the
144  // samples that may be left waiting.
145  for (StreamState& stream : stream_states_) {
146  RETURN_IF_ERROR(RunThroughSamples(&stream));
147  DCHECK_EQ(stream.samples.size(), 0u);
148 
149  // Ignore extra cues at the end, except for text, as they will result in
150  // empty DASH Representations, which is not spec compliant.
151  // For text, if the cue is before the max end time, it will still be
152  // dispatched as the text samples intercepted by the cue can be split into
153  // two at the cue point.
154  for (auto& cue : stream.cues) {
155  // |max_text_sample_end_time_seconds| is always 0 for non-text samples.
156  if (cue->cue_event->time_in_seconds <
157  stream.max_text_sample_end_time_seconds) {
158  RETURN_IF_ERROR(Dispatch(std::move(cue)));
159  } else {
160  VLOG(1) << "Ignore extra cue in stream " << cue->stream_index
161  << " with time " << cue->cue_event->time_in_seconds
162  << "s in the end.";
163  }
164  }
165  stream.cues.clear();
166  }
167 
168  return FlushAllDownstreams();
169 }
170 
171 Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
172  StreamState& stream_state = stream_states_[data->stream_index];
173  // Keep a copy of the stream info so that we can check type and check
174  // timescale.
175  stream_state.info = data->stream_info;
176 
177  return Dispatch(std::move(data));
178 }
179 
180 Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
181  DCHECK(sample);
182  DCHECK(sample->media_sample);
183 
184  const size_t stream_index = sample->stream_index;
185  StreamState& stream = stream_states_[stream_index];
186 
187  const double sample_time = TimeInSeconds(*stream.info, *sample);
188  const bool is_key_frame = sample->media_sample->is_key_frame();
189 
190  if (is_key_frame && sample_time >= hint_) {
191  auto next_sync = sync_points_->PromoteAt(sample_time);
192 
193  if (!next_sync) {
194  LOG(ERROR) << "Failed to promote sync point at " << sample_time
195  << ". This happens only if video streams are not GOP-aligned.";
196  return Status(error::INVALID_ARGUMENT,
197  "Streams are not properly GOP-aligned.");
198  }
199 
200  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
201  DCHECK_EQ(stream.cues.size(), 1u);
202  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
203  stream.cues.pop_front();
204  }
205 
206  return Dispatch(std::move(sample));
207 }
208 
209 Status CueAlignmentHandler::OnNonVideoSample(
210  std::unique_ptr<StreamData> sample) {
211  DCHECK(sample);
212  DCHECK(sample->media_sample || sample->text_sample);
213 
214  const size_t stream_index = sample->stream_index;
215  StreamState& stream_state = stream_states_[stream_index];
216 
217  // Accept the sample. This will output it if it comes before the hint point or
218  // will cache it if it comes after the hint point.
219  RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
220 
221  // If all the streams are waiting on a hint, it means that none has next sync
222  // point determined. It also means that there are no video streams and we need
223  // to wait for all streams to converge on a hint so that we can get the next
224  // sync point.
225  if (EveryoneWaitingAtHint()) {
226  std::shared_ptr<const CueEvent> next_sync;
227  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
228  RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
229  }
230 
231  return Status::OK;
232 }
233 
234 Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
235  // There are two modes:
236  // 1. There is a video input.
237  // 2. There are no video inputs.
238  //
239  // When there is a video input, we rely on the video input get the next sync
240  // point and release all the samples.
241  //
242  // When there are no video inputs, we rely on the sync point queue to block
243  // us until there is a sync point.
244 
245  const size_t stream_index = sample->stream_index;
246 
247  if (sample->text_sample) {
248  StreamState& stream = stream_states_[stream_index];
249  stream.max_text_sample_end_time_seconds =
250  std::max(stream.max_text_sample_end_time_seconds,
251  TextEndTimeInSeconds(*stream.info, *sample));
252  }
253 
254  const StreamType stream_type =
255  stream_states_[stream_index].info->stream_type();
256  const bool is_video = stream_type == kStreamVideo;
257 
258  return is_video ? OnVideoSample(std::move(sample))
259  : OnNonVideoSample(std::move(sample));
260 }
261 
262 Status CueAlignmentHandler::UseNewSyncPoint(
263  std::shared_ptr<const CueEvent> new_sync) {
264  hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
265  DCHECK_GT(hint_, new_sync->time_in_seconds);
266 
267  for (size_t stream_index = 0; stream_index < stream_states_.size();
268  stream_index++) {
269  StreamState& stream = stream_states_[stream_index];
270  stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
271 
272  RETURN_IF_ERROR(RunThroughSamples(&stream));
273  }
274 
275  return Status::OK;
276 }
277 
278 bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
279  for (const StreamState& stream_state : stream_states_) {
280  if (stream_state.samples.empty()) {
281  return false;
282  }
283  }
284  return true;
285 }
286 
287 Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
288  StreamState* stream) {
289  DCHECK(sample);
290  DCHECK(sample->media_sample || sample->text_sample);
291  DCHECK(stream);
292 
293  // Need to cache the stream index as we will lose the pointer when we add
294  // the sample to the queue.
295  const size_t stream_index = sample->stream_index;
296 
297  stream->samples.push_back(std::move(sample));
298 
299  if (stream->samples.size() > kMaxBufferSize) {
300  LOG(ERROR) << "Stream " << stream_index << " has buffered "
301  << stream->samples.size() << " when the max is "
302  << kMaxBufferSize;
303  return Status(error::INVALID_ARGUMENT,
304  "Streams are not properly multiplexed.");
305  }
306 
307  return RunThroughSamples(stream);
308 }
309 
310 Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
311  // Step through all our samples until we find where we can insert the cue.
312  // Think of this as a merge sort.
313  while (stream->cues.size() && stream->samples.size()) {
314  const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
315  const double sample_time =
316  TimeInSeconds(*stream->info, *stream->samples.front());
317 
318  if (sample_time < cue_time) {
319  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
320  stream->samples.pop_front();
321  } else {
322  RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
323  stream->cues.pop_front();
324  }
325  }
326 
327  // If we still have samples, then it means that we sent out the cue and can
328  // now work up to the hint. So now send all samples that come before the hint
329  // downstream.
330  while (stream->samples.size() &&
331  TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
332  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
333  stream->samples.pop_front();
334  }
335 
336  return Status::OK;
337 }
338 } // namespace media
339 } // namespace shaka
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66