7 #include <packager/media/chunking/cue_alignment_handler.h>
11 #include <absl/log/check.h>
13 #include <packager/macros/logging.h>
14 #include <packager/macros/status.h>
22 const size_t kMaxBufferSize = 1000;
24 int64_t GetScaledTime(
const StreamInfo& info,
const StreamData& data) {
25 DCHECK(data.text_sample || data.media_sample);
27 if (data.text_sample) {
28 return data.text_sample->start_time();
31 if (info.stream_type() == kStreamText) {
36 <<
"A text streams should use text samples, not media samples.";
39 if (info.stream_type() == kStreamAudio) {
43 return data.media_sample->pts() + data.media_sample->duration() / 2;
46 DCHECK_EQ(info.stream_type(), kStreamVideo);
47 return data.media_sample->pts();
50 double TimeInSeconds(
const StreamInfo& info,
const StreamData& data) {
51 const int64_t scaled_time = GetScaledTime(info, data);
52 const int32_t time_scale = info.time_scale();
54 return static_cast<double>(scaled_time) / time_scale;
57 double TextEndTimeInSeconds(
const StreamInfo& info,
const StreamData& data) {
58 DCHECK(data.text_sample);
60 const int64_t scaled_time = data.text_sample->EndTime();
61 const int32_t time_scale = info.time_scale();
63 return static_cast<double>(scaled_time) / time_scale;
66 Status GetNextCue(
double hint,
67 SyncPointQueue* sync_points,
68 std::shared_ptr<const CueEvent>* out_cue) {
72 *out_cue = sync_points->GetNext(hint);
75 return *out_cue ? Status::OK
76 : Status(error::CANCELLED,
"SyncPointQueue is cancelled.");
80 CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
81 : sync_points_(sync_points) {}
83 Status CueAlignmentHandler::InitializeInternal() {
84 sync_points_->AddThread();
85 stream_states_.resize(num_input_streams());
89 hint_ = sync_points_->GetHint(-1);
94 Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
95 switch (data->stream_data_type) {
96 case StreamDataType::kStreamInfo:
97 return OnStreamInfo(std::move(data));
98 case StreamDataType::kTextSample:
99 case StreamDataType::kMediaSample:
100 return OnSample(std::move(data));
102 VLOG(3) <<
"Dropping unsupported data type "
103 <<
static_cast<int>(data->stream_data_type);
108 Status CueAlignmentHandler::OnFlushRequest(
size_t stream_index) {
109 stream_states_[stream_index].to_be_flushed =
true;
114 for (
const StreamState& stream_state : stream_states_) {
115 if (!stream_state.to_be_flushed) {
124 for (StreamState& stream : stream_states_) {
125 DCHECK(stream.to_be_flushed);
127 if (stream.info->stream_type() == kStreamVideo) {
128 DCHECK_EQ(stream.samples.size(), 0u)
129 <<
"Video streams should not store samples";
130 DCHECK_EQ(stream.cues.size(), 0u)
131 <<
"Video streams should not store cues";
137 while (sync_points_->HasMore(hint_)) {
138 std::shared_ptr<const CueEvent> next_cue;
139 RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
140 RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
145 for (StreamState& stream : stream_states_) {
146 RETURN_IF_ERROR(RunThroughSamples(&stream));
147 DCHECK_EQ(stream.samples.size(), 0u);
154 for (
auto& cue : stream.cues) {
156 if (cue->cue_event->time_in_seconds <
157 stream.max_text_sample_end_time_seconds) {
158 RETURN_IF_ERROR(Dispatch(std::move(cue)));
160 VLOG(1) <<
"Ignore extra cue in stream " << cue->stream_index
161 <<
" with time " << cue->cue_event->time_in_seconds
168 return FlushAllDownstreams();
171 Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
172 StreamState& stream_state = stream_states_[data->stream_index];
175 stream_state.info = data->stream_info;
177 return Dispatch(std::move(data));
180 Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
182 DCHECK(sample->media_sample);
184 const size_t stream_index = sample->stream_index;
185 StreamState& stream = stream_states_[stream_index];
187 const double sample_time = TimeInSeconds(*stream.info, *sample);
188 const bool is_key_frame = sample->media_sample->is_key_frame();
190 if (is_key_frame && sample_time >= hint_) {
191 auto next_sync = sync_points_->PromoteAt(sample_time);
194 LOG(ERROR) <<
"Failed to promote sync point at " << sample_time
195 <<
". This happens only if video streams are not GOP-aligned.";
196 return Status(error::INVALID_ARGUMENT,
197 "Streams are not properly GOP-aligned.");
200 RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
201 DCHECK_EQ(stream.cues.size(), 1u);
202 RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
203 stream.cues.pop_front();
206 return Dispatch(std::move(sample));
209 Status CueAlignmentHandler::OnNonVideoSample(
210 std::unique_ptr<StreamData> sample) {
212 DCHECK(sample->media_sample || sample->text_sample);
214 const size_t stream_index = sample->stream_index;
215 StreamState& stream_state = stream_states_[stream_index];
219 RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
225 if (EveryoneWaitingAtHint()) {
226 std::shared_ptr<const CueEvent> next_sync;
227 RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
228 RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
234 Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
245 const size_t stream_index = sample->stream_index;
247 if (sample->text_sample) {
248 StreamState& stream = stream_states_[stream_index];
249 stream.max_text_sample_end_time_seconds =
250 std::max(stream.max_text_sample_end_time_seconds,
251 TextEndTimeInSeconds(*stream.info, *sample));
254 const StreamType stream_type =
255 stream_states_[stream_index].info->stream_type();
256 const bool is_video = stream_type == kStreamVideo;
258 return is_video ? OnVideoSample(std::move(sample))
259 : OnNonVideoSample(std::move(sample));
262 Status CueAlignmentHandler::UseNewSyncPoint(
263 std::shared_ptr<const CueEvent> new_sync) {
264 hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
265 DCHECK_GT(hint_, new_sync->time_in_seconds);
267 for (
size_t stream_index = 0; stream_index < stream_states_.size();
269 StreamState& stream = stream_states_[stream_index];
270 stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
272 RETURN_IF_ERROR(RunThroughSamples(&stream));
278 bool CueAlignmentHandler::EveryoneWaitingAtHint()
const {
279 for (
const StreamState& stream_state : stream_states_) {
280 if (stream_state.samples.empty()) {
287 Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
288 StreamState* stream) {
290 DCHECK(sample->media_sample || sample->text_sample);
295 const size_t stream_index = sample->stream_index;
297 stream->samples.push_back(std::move(sample));
299 if (stream->samples.size() > kMaxBufferSize) {
300 LOG(ERROR) <<
"Stream " << stream_index <<
" has buffered "
301 << stream->samples.size() <<
" when the max is "
303 return Status(error::INVALID_ARGUMENT,
304 "Streams are not properly multiplexed.");
307 return RunThroughSamples(stream);
310 Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
313 while (stream->cues.size() && stream->samples.size()) {
314 const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
315 const double sample_time =
316 TimeInSeconds(*stream->info, *stream->samples.front());
318 if (sample_time < cue_time) {
319 RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
320 stream->samples.pop_front();
322 RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
323 stream->cues.pop_front();
330 while (stream->samples.size() &&
331 TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
332 RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
333 stream->samples.pop_front();
All the methods that are virtual are virtual for mocking.