Shaka Packager SDK
demuxer.cc
1 // Copyright 2014 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/media/demuxer/demuxer.h>
8 
9 #include <algorithm>
10 #include <functional>
11 
12 #include <absl/log/check.h>
13 #include <absl/log/log.h>
14 #include <absl/strings/escaping.h>
15 #include <absl/strings/numbers.h>
16 #include <absl/strings/str_format.h>
17 
18 #include <packager/file.h>
19 #include <packager/macros/compiler.h>
20 #include <packager/macros/logging.h>
21 #include <packager/media/base/decryptor_source.h>
22 #include <packager/media/base/key_source.h>
23 #include <packager/media/base/media_sample.h>
24 #include <packager/media/base/stream_info.h>
25 #include <packager/media/formats/mp2t/mp2t_media_parser.h>
26 #include <packager/media/formats/mp4/mp4_media_parser.h>
27 #include <packager/media/formats/webm/webm_media_parser.h>
28 #include <packager/media/formats/webvtt/webvtt_parser.h>
29 #include <packager/media/formats/wvm/wvm_media_parser.h>
30 
31 namespace {
32 // 65KB, sufficient to determine the container and likely all init data.
33 const size_t kInitBufSize = 0x10000;
34 const size_t kBufSize = 0x200000; // 2MB
35 // Maximum number of allowed queued samples. If we are receiving a lot of
36 // samples before seeing init_event, something is not right. The number
37 // set here is arbitrary though.
38 const size_t kQueuedSamplesLimit = 10000;
39 const size_t kInvalidStreamIndex = static_cast<size_t>(-1);
40 const size_t kBaseVideoOutputStreamIndex = 0x100;
41 const size_t kBaseAudioOutputStreamIndex = 0x200;
42 const size_t kBaseTextOutputStreamIndex = 0x300;
43 
44 std::string GetStreamLabel(size_t stream_index) {
45  switch (stream_index) {
46  case kBaseVideoOutputStreamIndex:
47  return "video";
48  case kBaseAudioOutputStreamIndex:
49  return "audio";
50  case kBaseTextOutputStreamIndex:
51  return "text";
52  default:
53  return absl::StrFormat("%u", stream_index);
54  }
55 }
56 
57 bool GetStreamIndex(const std::string& stream_label, size_t* stream_index) {
58  DCHECK(stream_index);
59  if (stream_label == "video") {
60  *stream_index = kBaseVideoOutputStreamIndex;
61  } else if (stream_label == "audio") {
62  *stream_index = kBaseAudioOutputStreamIndex;
63  } else if (stream_label == "text") {
64  *stream_index = kBaseTextOutputStreamIndex;
65  } else {
66  // Expect stream_label to be a zero based stream id.
67  if (!absl::SimpleAtoi(stream_label, stream_index)) {
68  LOG(ERROR) << "Invalid argument --stream=" << stream_label << "; "
69  << "should be 'audio', 'video', 'text', or a number";
70  return false;
71  }
72  }
73  return true;
74 }
75 
76 }
77 
78 namespace shaka {
79 namespace media {
80 
81 Demuxer::Demuxer(const std::string& file_name)
82  : file_name_(file_name), buffer_(new uint8_t[kBufSize]) {}
83 
84 Demuxer::~Demuxer() {
85  if (media_file_)
86  media_file_->Close();
87 }
88 
89 void Demuxer::SetKeySource(std::unique_ptr<KeySource> key_source) {
90  key_source_ = std::move(key_source);
91 }
92 
93 Status Demuxer::Run() {
94  LOG(INFO) << "Demuxer::Run() on file '" << file_name_ << "'.";
95  Status status = InitializeParser();
96  // ParserInitEvent callback is called after a few calls to Parse(), which sets
97  // up the streams. Only after that, we can verify the outputs below.
98  while (!all_streams_ready_ && status.ok())
99  status.Update(Parse());
100  // If no output is defined, then return success after receiving all stream
101  // info.
102  if (all_streams_ready_ && output_handlers().empty())
103  return Status::OK;
104  if (!init_event_status_.ok())
105  return init_event_status_;
106  if (!status.ok())
107  return status;
108  // Check if all specified outputs exists.
109  for (const auto& pair : output_handlers()) {
110  if (std::find(stream_indexes_.begin(), stream_indexes_.end(), pair.first) ==
111  stream_indexes_.end()) {
112  LOG(ERROR) << "Invalid argument, stream=" << GetStreamLabel(pair.first)
113  << " not available.";
114  return Status(error::INVALID_ARGUMENT, "Stream not available");
115  }
116  }
117 
118  while (!cancelled_ && status.ok())
119  status.Update(Parse());
120  if (cancelled_ && status.ok())
121  return Status(error::CANCELLED, "Demuxer run cancelled");
122 
123  if (status.error_code() == error::END_OF_STREAM) {
124  for (size_t stream_index : stream_indexes_) {
125  status = FlushDownstream(stream_index);
126  if (!status.ok())
127  return status;
128  }
129  return Status::OK;
130  }
131  return status;
132 }
133 
135  cancelled_ = true;
136 }
137 
138 Status Demuxer::SetHandler(const std::string& stream_label,
139  std::shared_ptr<MediaHandler> handler) {
140  size_t stream_index = kInvalidStreamIndex;
141  if (!GetStreamIndex(stream_label, &stream_index)) {
142  return Status(error::INVALID_ARGUMENT,
143  "Invalid stream: " + stream_label);
144  }
145  return MediaHandler::SetHandler(stream_index, std::move(handler));
146 }
147 
148 void Demuxer::SetLanguageOverride(const std::string& stream_label,
149  const std::string& language_override) {
150  size_t stream_index = kInvalidStreamIndex;
151  if (!GetStreamIndex(stream_label, &stream_index))
152  LOG(WARNING) << "Invalid stream for language override " << stream_label;
153  language_overrides_[stream_index] = language_override;
154 }
155 
156 Status Demuxer::InitializeParser() {
157  DCHECK(!media_file_);
158  DCHECK(!all_streams_ready_);
159 
160  LOG(INFO) << "Initialize Demuxer for file '" << file_name_ << "'.";
161 
162  media_file_ = File::Open(file_name_.c_str(), "r");
163  if (!media_file_) {
164  return Status(error::FILE_FAILURE,
165  "Cannot open file for reading " + file_name_);
166  }
167 
168  int64_t bytes_read = 0;
169  bool eof = false;
170  if (input_format_.empty()) {
171  // Read enough bytes before detecting the container.
172  while (static_cast<size_t>(bytes_read) < kInitBufSize) {
173  int64_t read_result =
174  media_file_->Read(buffer_.get() + bytes_read, kInitBufSize);
175  if (read_result < 0)
176  return Status(error::FILE_FAILURE, "Cannot read file " + file_name_);
177  if (read_result == 0) {
178  eof = true;
179  break;
180  }
181  bytes_read += read_result;
182  }
183  container_name_ = DetermineContainer(buffer_.get(), bytes_read);
184  } else {
185  container_name_ = DetermineContainerFromFormatName(input_format_);
186  }
187 
188  // Initialize media parser.
189  switch (container_name_) {
190  case CONTAINER_MOV:
191  parser_.reset(new mp4::MP4MediaParser());
192  break;
193  case CONTAINER_MPEG2TS:
194  parser_.reset(new mp2t::Mp2tMediaParser());
195  break;
196  // Widevine classic (WVM) is derived from MPEG2PS. We do not support
197  // non-WVM MPEG2PS file, thus we do not differentiate between the two.
198  // Every MPEG2PS file is assumed to be WVM file. If it turns out not the
199  // case, an error will be reported when trying to parse the file as WVM
200  // file.
201  case CONTAINER_MPEG2PS:
202  FALLTHROUGH_INTENDED;
203  case CONTAINER_WVM:
204  parser_.reset(new wvm::WvmMediaParser());
205  break;
206  case CONTAINER_WEBM:
207  parser_.reset(new WebMMediaParser());
208  break;
209  case CONTAINER_WEBVTT:
210  parser_.reset(new WebVttParser());
211  break;
212  case CONTAINER_UNKNOWN: {
213  const int64_t kDumpSizeLimit = 512;
214  LOG(ERROR) << "Failed to detect the container type from the buffer: "
215  << absl::BytesToHexString(absl::string_view(
216  reinterpret_cast<const char*>(buffer_.get()),
217  std::min(bytes_read, kDumpSizeLimit)));
218  return Status(error::INVALID_ARGUMENT,
219  "Failed to detect the container type.");
220  }
221  default:
222  NOTIMPLEMENTED() << "Container " << container_name_
223  << " is not supported.";
224  return Status(error::UNIMPLEMENTED, "Container not supported.");
225  }
226 
227  parser_->Init(
228  std::bind(&Demuxer::ParserInitEvent, this, std::placeholders::_1),
229  std::bind(&Demuxer::NewMediaSampleEvent, this, std::placeholders::_1,
230  std::placeholders::_2),
231  std::bind(&Demuxer::NewTextSampleEvent, this, std::placeholders::_1,
232  std::placeholders::_2),
233  key_source_.get());
234 
235  // Handle trailing 'moov'.
236  if (container_name_ == CONTAINER_MOV &&
237  File::IsLocalRegularFile(file_name_.c_str())) {
238  // TODO(kqyang): Investigate whether we can reuse the existing file
239  // descriptor |media_file_| instead of opening the same file again.
240  static_cast<mp4::MP4MediaParser*>(parser_.get())->LoadMoov(file_name_);
241  }
242  if (!parser_->Parse(buffer_.get(), bytes_read) || (eof && !parser_->Flush())) {
243  return Status(error::PARSER_FAILURE,
244  "Cannot parse media file " + file_name_);
245  }
246  return Status::OK;
247 }
248 
249 void Demuxer::ParserInitEvent(
250  const std::vector<std::shared_ptr<StreamInfo>>& stream_infos) {
251  if (dump_stream_info_) {
252  printf("\nFile \"%s\":\n", file_name_.c_str());
253  printf("Found %zu stream(s).\n", stream_infos.size());
254  for (size_t i = 0; i < stream_infos.size(); ++i)
255  printf("Stream [%zu] %s\n", i, stream_infos[i]->ToString().c_str());
256  }
257 
258  int base_stream_index = 0;
259  bool video_handler_set =
260  output_handlers().find(kBaseVideoOutputStreamIndex) !=
261  output_handlers().end();
262  bool audio_handler_set =
263  output_handlers().find(kBaseAudioOutputStreamIndex) !=
264  output_handlers().end();
265  bool text_handler_set =
266  output_handlers().find(kBaseTextOutputStreamIndex) !=
267  output_handlers().end();
268  for (const std::shared_ptr<StreamInfo>& stream_info : stream_infos) {
269  size_t stream_index = base_stream_index;
270  if (video_handler_set && stream_info->stream_type() == kStreamVideo) {
271  stream_index = kBaseVideoOutputStreamIndex;
272  // Only for the first video stream.
273  video_handler_set = false;
274  }
275  if (audio_handler_set && stream_info->stream_type() == kStreamAudio) {
276  stream_index = kBaseAudioOutputStreamIndex;
277  // Only for the first audio stream.
278  audio_handler_set = false;
279  }
280  if (text_handler_set && stream_info->stream_type() == kStreamText) {
281  stream_index = kBaseTextOutputStreamIndex;
282  text_handler_set = false;
283  }
284 
285  const bool handler_set =
286  output_handlers().find(stream_index) != output_handlers().end();
287  if (handler_set) {
288  track_id_to_stream_index_map_[stream_info->track_id()] = stream_index;
289  stream_indexes_.push_back(stream_index);
290  auto iter = language_overrides_.find(stream_index);
291  if (iter != language_overrides_.end() &&
292  stream_info->stream_type() != kStreamVideo) {
293  stream_info->set_language(iter->second);
294  }
295  if (stream_info->is_encrypted()) {
296  init_event_status_.Update(Status(error::INVALID_ARGUMENT,
297  "A decryption key source is not "
298  "provided for an encrypted stream."));
299  } else {
300  init_event_status_.Update(
301  DispatchStreamInfo(stream_index, stream_info));
302  }
303  } else {
304  track_id_to_stream_index_map_[stream_info->track_id()] =
305  kInvalidStreamIndex;
306  }
307  ++base_stream_index;
308  }
309  all_streams_ready_ = true;
310 }
311 
312 bool Demuxer::NewMediaSampleEvent(uint32_t track_id,
313  std::shared_ptr<MediaSample> sample) {
314  if (!all_streams_ready_) {
315  if (queued_media_samples_.size() >= kQueuedSamplesLimit) {
316  LOG(ERROR) << "Queued samples limit reached: " << kQueuedSamplesLimit;
317  return false;
318  }
319  queued_media_samples_.emplace_back(track_id, sample);
320  return true;
321  }
322  if (!init_event_status_.ok()) {
323  return false;
324  }
325 
326  while (!queued_media_samples_.empty()) {
327  if (!PushMediaSample(queued_media_samples_.front().track_id,
328  queued_media_samples_.front().sample)) {
329  return false;
330  }
331  queued_media_samples_.pop_front();
332  }
333  return PushMediaSample(track_id, sample);
334 }
335 
336 bool Demuxer::NewTextSampleEvent(uint32_t track_id,
337  std::shared_ptr<TextSample> sample) {
338  if (!all_streams_ready_) {
339  if (queued_text_samples_.size() >= kQueuedSamplesLimit) {
340  LOG(ERROR) << "Queued samples limit reached: " << kQueuedSamplesLimit;
341  return false;
342  }
343  queued_text_samples_.emplace_back(track_id, sample);
344  return true;
345  }
346  if (!init_event_status_.ok()) {
347  return false;
348  }
349 
350  while (!queued_text_samples_.empty()) {
351  if (!PushTextSample(queued_text_samples_.front().track_id,
352  queued_text_samples_.front().sample)) {
353  return false;
354  }
355  queued_text_samples_.pop_front();
356  }
357  return PushTextSample(track_id, sample);
358 }
359 
360 bool Demuxer::PushMediaSample(uint32_t track_id,
361  std::shared_ptr<MediaSample> sample) {
362  auto stream_index_iter = track_id_to_stream_index_map_.find(track_id);
363  if (stream_index_iter == track_id_to_stream_index_map_.end()) {
364  LOG(ERROR) << "Track " << track_id << " not found.";
365  return false;
366  }
367  if (stream_index_iter->second == kInvalidStreamIndex)
368  return true;
369  Status status = DispatchMediaSample(stream_index_iter->second, sample);
370  if (!status.ok()) {
371  LOG(ERROR) << "Failed to process sample " << stream_index_iter->second
372  << " " << status;
373  return false;
374  }
375  return true;
376 }
377 
378 bool Demuxer::PushTextSample(uint32_t track_id,
379  std::shared_ptr<TextSample> sample) {
380  auto stream_index_iter = track_id_to_stream_index_map_.find(track_id);
381  if (stream_index_iter == track_id_to_stream_index_map_.end()) {
382  LOG(ERROR) << "Track " << track_id << " not found.";
383  return false;
384  }
385  if (stream_index_iter->second == kInvalidStreamIndex)
386  return true;
387  Status status = DispatchTextSample(stream_index_iter->second, sample);
388  if (!status.ok()) {
389  LOG(ERROR) << "Failed to process sample " << stream_index_iter->second
390  << " " << status;
391  return false;
392  }
393  return true;
394 }
395 
396 Status Demuxer::Parse() {
397  DCHECK(media_file_);
398  DCHECK(parser_);
399  DCHECK(buffer_);
400 
401  int64_t bytes_read = media_file_->Read(buffer_.get(), kBufSize);
402  if (bytes_read == 0) {
403  if (!parser_->Flush())
404  return Status(error::PARSER_FAILURE, "Failed to flush.");
405  return Status(error::END_OF_STREAM, "");
406  } else if (bytes_read < 0) {
407  return Status(error::FILE_FAILURE, "Cannot read file " + file_name_);
408  }
409 
410  return parser_->Parse(buffer_.get(), bytes_read)
411  ? Status::OK
412  : Status(error::PARSER_FAILURE,
413  "Cannot parse media file " + file_name_);
414 }
415 
416 } // namespace media
417 } // namespace shaka
Status Run() override
Definition: demuxer.cc:93
Status SetHandler(const std::string &stream_label, std::shared_ptr< MediaHandler > handler)
Definition: demuxer.cc:138
void Cancel() override
Definition: demuxer.cc:134
void SetLanguageOverride(const std::string &stream_label, const std::string &language_override)
Definition: demuxer.cc:148
void SetKeySource(std::unique_ptr< KeySource > key_source)
Definition: demuxer.cc:89
Demuxer(const std::string &file_name)
Definition: demuxer.cc:81
Status SetHandler(size_t output_stream_index, std::shared_ptr< MediaHandler > handler)
Connect downstream handler at the specified output stream index.
Status DispatchMediaSample(size_t stream_index, std::shared_ptr< const MediaSample > media_sample) const
Dispatch the media sample to downstream handlers.
Status DispatchTextSample(size_t stream_index, std::shared_ptr< const TextSample > text_sample) const
Dispatch the text sample to downstream handlers.
Status DispatchStreamInfo(size_t stream_index, std::shared_ptr< const StreamInfo > stream_info) const
Dispatch the stream info to downstream handlers.
Status FlushDownstream(size_t output_stream_index)
Flush the downstream connected at the specified output stream index.
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66