Shaka Packager SDK
Loading...
Searching...
No Matches
demuxer.cc
1// Copyright 2014 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/media/demuxer/demuxer.h>
8
9#include <algorithm>
10#include <functional>
11
12#include <absl/log/check.h>
13#include <absl/log/log.h>
14#include <absl/strings/escaping.h>
15#include <absl/strings/numbers.h>
16#include <absl/strings/str_format.h>
17
18#include <packager/file.h>
19#include <packager/macros/compiler.h>
20#include <packager/macros/logging.h>
21#include <packager/media/base/decryptor_source.h>
22#include <packager/media/base/key_source.h>
23#include <packager/media/base/media_sample.h>
24#include <packager/media/base/stream_info.h>
25#include <packager/media/formats/mp2t/mp2t_media_parser.h>
26#include <packager/media/formats/mp4/mp4_media_parser.h>
27#include <packager/media/formats/webm/webm_media_parser.h>
28#include <packager/media/formats/webvtt/webvtt_parser.h>
29#include <packager/media/formats/wvm/wvm_media_parser.h>
30
31namespace {
32// 65KB, sufficient to determine the container and likely all init data.
33const size_t kInitBufSize = 0x10000;
34const size_t kBufSize = 0x200000; // 2MB
35// Maximum number of allowed queued samples. If we are receiving a lot of
36// samples before seeing init_event, something is not right. The number
37// set here is arbitrary though.
38const size_t kQueuedSamplesLimit = 10000;
39const size_t kInvalidStreamIndex = static_cast<size_t>(-1);
40const size_t kBaseVideoOutputStreamIndex = 0x100;
41const size_t kBaseAudioOutputStreamIndex = 0x200;
42const size_t kBaseTextOutputStreamIndex = 0x300;
43
44std::string GetStreamLabel(size_t stream_index) {
45 switch (stream_index) {
46 case kBaseVideoOutputStreamIndex:
47 return "video";
48 case kBaseAudioOutputStreamIndex:
49 return "audio";
50 case kBaseTextOutputStreamIndex:
51 return "text";
52 default:
53 return absl::StrFormat("%u", stream_index);
54 }
55}
56
57bool GetStreamIndex(const std::string& stream_label, size_t* stream_index) {
58 DCHECK(stream_index);
59 if (stream_label == "video") {
60 *stream_index = kBaseVideoOutputStreamIndex;
61 } else if (stream_label == "audio") {
62 *stream_index = kBaseAudioOutputStreamIndex;
63 } else if (stream_label == "text") {
64 *stream_index = kBaseTextOutputStreamIndex;
65 } else {
66 // Expect stream_label to be a zero based stream id.
67 if (!absl::SimpleAtoi(stream_label, stream_index)) {
68 LOG(ERROR) << "Invalid argument --stream=" << stream_label << "; "
69 << "should be 'audio', 'video', 'text', or a number";
70 return false;
71 }
72 }
73 return true;
74}
75
76} // namespace
77
78namespace shaka {
79namespace media {
80
81Demuxer::Demuxer(const std::string& file_name)
82 : file_name_(file_name), buffer_(new uint8_t[kBufSize]) {}
83
84Demuxer::~Demuxer() {
85 if (media_file_)
86 media_file_->Close();
87}
88
89void Demuxer::SetKeySource(std::unique_ptr<KeySource> key_source) {
90 key_source_ = std::move(key_source);
91}
92
93Status Demuxer::Run() {
94 LOG(INFO) << "Demuxer::Run() on file '" << file_name_ << "'.";
95 Status status = InitializeParser();
96 // ParserInitEvent callback is called after a few calls to Parse(), which sets
97 // up the streams. Only after that, we can verify the outputs below.
98 while (!all_streams_ready_ && status.ok())
99 status.Update(Parse());
100 // If no output is defined, then return success after receiving all stream
101 // info.
102 if (all_streams_ready_ && output_handlers().empty())
103 return Status::OK;
104 if (!init_event_status_.ok())
105 return init_event_status_;
106 if (!status.ok())
107 return status;
108 // Check if all specified outputs exists.
109 for (const auto& pair : output_handlers()) {
110 if (std::find(stream_indexes_.begin(), stream_indexes_.end(), pair.first) ==
111 stream_indexes_.end()) {
112 LOG(ERROR) << "Invalid argument, stream=" << GetStreamLabel(pair.first)
113 << " not available.";
114 return Status(error::INVALID_ARGUMENT, "Stream not available");
115 }
116 }
117
118 while (!cancelled_ && status.ok())
119 status.Update(Parse());
120 if (cancelled_ && status.ok())
121 return Status(error::CANCELLED, "Demuxer run cancelled");
122
123 if (status.error_code() == error::END_OF_STREAM) {
124 for (size_t stream_index : stream_indexes_) {
125 status = FlushDownstream(stream_index);
126 if (!status.ok())
127 return status;
128 }
129 return Status::OK;
130 }
131 return status;
132}
133
135 cancelled_ = true;
136}
137
138Status Demuxer::SetHandler(const std::string& stream_label,
139 std::shared_ptr<MediaHandler> handler) {
140 size_t stream_index = kInvalidStreamIndex;
141 if (!GetStreamIndex(stream_label, &stream_index)) {
142 return Status(error::INVALID_ARGUMENT, "Invalid stream: " + stream_label);
143 }
144 return MediaHandler::SetHandler(stream_index, std::move(handler));
145}
146
147void Demuxer::SetLanguageOverride(const std::string& stream_label,
148 const std::string& language_override) {
149 size_t stream_index = kInvalidStreamIndex;
150 if (!GetStreamIndex(stream_label, &stream_index))
151 LOG(WARNING) << "Invalid stream for language override " << stream_label;
152 language_overrides_[stream_index] = language_override;
153}
154
155Status Demuxer::InitializeParser() {
156 DCHECK(!media_file_);
157 DCHECK(!all_streams_ready_);
158
159 LOG(INFO) << "Initialize Demuxer for file '" << file_name_ << "'.";
160
161 media_file_ = File::Open(file_name_.c_str(), "r");
162 if (!media_file_) {
163 return Status(error::FILE_FAILURE,
164 "Cannot open file for reading " + file_name_);
165 }
166
167 int64_t bytes_read = 0;
168 bool eof = false;
169 if (input_format_.empty()) {
170 // Read enough bytes before detecting the container.
171 while (static_cast<size_t>(bytes_read) < kInitBufSize) {
172 int64_t read_result =
173 media_file_->Read(buffer_.get() + bytes_read, kInitBufSize);
174 if (read_result < 0)
175 return Status(error::FILE_FAILURE, "Cannot read file " + file_name_);
176 if (read_result == 0) {
177 eof = true;
178 break;
179 }
180 bytes_read += read_result;
181 }
182 container_name_ = DetermineContainer(buffer_.get(), bytes_read);
183 } else {
184 container_name_ = DetermineContainerFromFormatName(input_format_);
185 }
186
187 // Initialize media parser.
188 switch (container_name_) {
189 case CONTAINER_MOV:
190 parser_.reset(new mp4::MP4MediaParser());
191 break;
192 case CONTAINER_MPEG2TS:
193 parser_.reset(new mp2t::Mp2tMediaParser());
194 break;
195 // Widevine classic (WVM) is derived from MPEG2PS. We do not support
196 // non-WVM MPEG2PS file, thus we do not differentiate between the two.
197 // Every MPEG2PS file is assumed to be WVM file. If it turns out not the
198 // case, an error will be reported when trying to parse the file as WVM
199 // file.
200 case CONTAINER_MPEG2PS:
201 FALLTHROUGH_INTENDED;
202 case CONTAINER_WVM:
203 parser_.reset(new wvm::WvmMediaParser());
204 break;
205 case CONTAINER_WEBM:
206 parser_.reset(new WebMMediaParser());
207 break;
208 case CONTAINER_WEBVTT:
209 parser_.reset(new WebVttParser());
210 break;
211 case CONTAINER_UNKNOWN: {
212 const int64_t kDumpSizeLimit = 512;
213 LOG(ERROR) << "Failed to detect the container type from the buffer: "
214 << absl::BytesToHexString(absl::string_view(
215 reinterpret_cast<const char*>(buffer_.get()),
216 std::min(bytes_read, kDumpSizeLimit)));
217 return Status(error::INVALID_ARGUMENT,
218 "Failed to detect the container type.");
219 }
220 default:
221 NOTIMPLEMENTED() << "Container " << container_name_
222 << " is not supported.";
223 return Status(error::UNIMPLEMENTED, "Container not supported.");
224 }
225
226 parser_->Init(
227 std::bind(&Demuxer::ParserInitEvent, this, std::placeholders::_1),
228 std::bind(&Demuxer::NewMediaSampleEvent, this, std::placeholders::_1,
229 std::placeholders::_2),
230 std::bind(&Demuxer::NewTextSampleEvent, this, std::placeholders::_1,
231 std::placeholders::_2),
232 key_source_.get());
233
234 // Handle trailing 'moov'.
235 if (container_name_ == CONTAINER_MOV &&
236 File::IsLocalRegularFile(file_name_.c_str())) {
237 // TODO(kqyang): Investigate whether we can reuse the existing file
238 // descriptor |media_file_| instead of opening the same file again.
239 static_cast<mp4::MP4MediaParser*>(parser_.get())->LoadMoov(file_name_);
240 }
241 if (!parser_->Parse(buffer_.get(), bytes_read) ||
242 (eof && !parser_->Flush())) {
243 return Status(error::PARSER_FAILURE,
244 "Cannot parse media file " + file_name_);
245 }
246 return Status::OK;
247}
248
249void Demuxer::ParserInitEvent(
250 const std::vector<std::shared_ptr<StreamInfo>>& stream_infos) {
251 if (dump_stream_info_) {
252 printf("\nFile \"%s\":\n", file_name_.c_str());
253 printf("Found %zu stream(s).\n", stream_infos.size());
254 for (size_t i = 0; i < stream_infos.size(); ++i)
255 printf("Stream [%zu] %s\n", i, stream_infos[i]->ToString().c_str());
256 }
257
258 int base_stream_index = 0;
259 bool video_handler_set =
260 output_handlers().find(kBaseVideoOutputStreamIndex) !=
261 output_handlers().end();
262 bool audio_handler_set =
263 output_handlers().find(kBaseAudioOutputStreamIndex) !=
264 output_handlers().end();
265 bool text_handler_set = output_handlers().find(kBaseTextOutputStreamIndex) !=
266 output_handlers().end();
267 for (const std::shared_ptr<StreamInfo>& stream_info : stream_infos) {
268 size_t stream_index = base_stream_index;
269 if (video_handler_set && stream_info->stream_type() == kStreamVideo) {
270 stream_index = kBaseVideoOutputStreamIndex;
271 // Only for the first video stream.
272 video_handler_set = false;
273 }
274 if (audio_handler_set && stream_info->stream_type() == kStreamAudio) {
275 stream_index = kBaseAudioOutputStreamIndex;
276 // Only for the first audio stream.
277 audio_handler_set = false;
278 }
279 if (text_handler_set && stream_info->stream_type() == kStreamText) {
280 stream_index = kBaseTextOutputStreamIndex;
281 text_handler_set = false;
282 }
283
284 const bool handler_set =
285 output_handlers().find(stream_index) != output_handlers().end();
286 if (handler_set) {
287 track_id_to_stream_index_map_[stream_info->track_id()] = stream_index;
288 stream_indexes_.push_back(stream_index);
289 auto iter = language_overrides_.find(stream_index);
290 if (iter != language_overrides_.end() &&
291 stream_info->stream_type() != kStreamVideo) {
292 stream_info->set_language(iter->second);
293 }
294 if (stream_info->is_encrypted()) {
295 init_event_status_.Update(Status(error::INVALID_ARGUMENT,
296 "A decryption key source is not "
297 "provided for an encrypted stream."));
298 } else {
299 init_event_status_.Update(
300 DispatchStreamInfo(stream_index, stream_info));
301 }
302 } else {
303 track_id_to_stream_index_map_[stream_info->track_id()] =
304 kInvalidStreamIndex;
305 }
306 ++base_stream_index;
307 }
308 all_streams_ready_ = true;
309}
310
311bool Demuxer::NewMediaSampleEvent(uint32_t track_id,
312 std::shared_ptr<MediaSample> sample) {
313 if (!all_streams_ready_) {
314 if (queued_media_samples_.size() >= kQueuedSamplesLimit) {
315 LOG(ERROR) << "Queued samples limit reached: " << kQueuedSamplesLimit;
316 return false;
317 }
318 queued_media_samples_.emplace_back(track_id, sample);
319 return true;
320 }
321 if (!init_event_status_.ok()) {
322 return false;
323 }
324
325 while (!queued_media_samples_.empty()) {
326 if (!PushMediaSample(queued_media_samples_.front().track_id,
327 queued_media_samples_.front().sample)) {
328 return false;
329 }
330 queued_media_samples_.pop_front();
331 }
332 return PushMediaSample(track_id, sample);
333}
334
335bool Demuxer::NewTextSampleEvent(uint32_t track_id,
336 std::shared_ptr<TextSample> sample) {
337 if (!all_streams_ready_) {
338 if (queued_text_samples_.size() >= kQueuedSamplesLimit) {
339 LOG(ERROR) << "Queued samples limit reached: " << kQueuedSamplesLimit;
340 return false;
341 }
342 queued_text_samples_.emplace_back(track_id, sample);
343 return true;
344 }
345 if (!init_event_status_.ok()) {
346 return false;
347 }
348
349 while (!queued_text_samples_.empty()) {
350 if (!PushTextSample(queued_text_samples_.front().track_id,
351 queued_text_samples_.front().sample)) {
352 return false;
353 }
354 queued_text_samples_.pop_front();
355 }
356 return PushTextSample(track_id, sample);
357}
358
359bool Demuxer::PushMediaSample(uint32_t track_id,
360 std::shared_ptr<MediaSample> sample) {
361 auto stream_index_iter = track_id_to_stream_index_map_.find(track_id);
362 if (stream_index_iter == track_id_to_stream_index_map_.end()) {
363 LOG(ERROR) << "Track " << track_id << " not found.";
364 return false;
365 }
366 if (stream_index_iter->second == kInvalidStreamIndex)
367 return true;
368 Status status = DispatchMediaSample(stream_index_iter->second, sample);
369 if (!status.ok()) {
370 LOG(ERROR) << "Failed to process sample " << stream_index_iter->second
371 << " " << status;
372 return false;
373 }
374 return true;
375}
376
377bool Demuxer::PushTextSample(uint32_t track_id,
378 std::shared_ptr<TextSample> sample) {
379 auto stream_index_iter = track_id_to_stream_index_map_.find(track_id);
380 if (stream_index_iter == track_id_to_stream_index_map_.end()) {
381 LOG(ERROR) << "Track " << track_id << " not found.";
382 return false;
383 }
384 if (stream_index_iter->second == kInvalidStreamIndex)
385 return true;
386 Status status = DispatchTextSample(stream_index_iter->second, sample);
387 if (!status.ok()) {
388 LOG(ERROR) << "Failed to process sample " << stream_index_iter->second
389 << " " << status;
390 return false;
391 }
392 return true;
393}
394
395Status Demuxer::Parse() {
396 DCHECK(media_file_);
397 DCHECK(parser_);
398 DCHECK(buffer_);
399
400 int64_t bytes_read = media_file_->Read(buffer_.get(), kBufSize);
401 if (bytes_read == 0) {
402 if (!parser_->Flush())
403 return Status(error::PARSER_FAILURE, "Failed to flush.");
404 return Status(error::END_OF_STREAM, "");
405 } else if (bytes_read < 0) {
406 return Status(error::FILE_FAILURE, "Cannot read file " + file_name_);
407 }
408
409 return parser_->Parse(buffer_.get(), bytes_read)
410 ? Status::OK
411 : Status(error::PARSER_FAILURE,
412 "Cannot parse media file " + file_name_);
413}
414
415} // namespace media
416} // namespace shaka
Status Run() override
Definition demuxer.cc:93
Status SetHandler(const std::string &stream_label, std::shared_ptr< MediaHandler > handler)
Definition demuxer.cc:138
void Cancel() override
Definition demuxer.cc:134
void SetLanguageOverride(const std::string &stream_label, const std::string &language_override)
Definition demuxer.cc:147
void SetKeySource(std::unique_ptr< KeySource > key_source)
Definition demuxer.cc:89
Demuxer(const std::string &file_name)
Definition demuxer.cc:81
Status SetHandler(size_t output_stream_index, std::shared_ptr< MediaHandler > handler)
Connect downstream handler at the specified output stream index.
Status DispatchMediaSample(size_t stream_index, std::shared_ptr< const MediaSample > media_sample) const
Dispatch the media sample to downstream handlers.
Status DispatchTextSample(size_t stream_index, std::shared_ptr< const TextSample > text_sample) const
Dispatch the text sample to downstream handlers.
Status DispatchStreamInfo(size_t stream_index, std::shared_ptr< const StreamInfo > stream_info) const
Dispatch the stream info to downstream handlers.
Status FlushDownstream(size_t output_stream_index)
Flush the downstream connected at the specified output stream index.
All the methods that are virtual are virtual for mocking.