Shaka Packager SDK
mp2t_media_parser.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <packager/media/formats/mp2t/mp2t_media_parser.h>
6 
7 #include <functional>
8 #include <memory>
9 
10 #include <absl/log/check.h>
11 
12 #include <packager/macros/logging.h>
13 #include <packager/media/base/media_sample.h>
14 #include <packager/media/base/stream_info.h>
15 #include <packager/media/base/text_sample.h>
16 #include <packager/media/formats/mp2t/es_parser.h>
17 #include <packager/media/formats/mp2t/es_parser_audio.h>
18 #include <packager/media/formats/mp2t/es_parser_dvb.h>
19 #include <packager/media/formats/mp2t/es_parser_h264.h>
20 #include <packager/media/formats/mp2t/es_parser_h265.h>
21 #include <packager/media/formats/mp2t/es_parser_teletext.h>
22 #include <packager/media/formats/mp2t/mp2t_common.h>
23 #include <packager/media/formats/mp2t/ts_audio_type.h>
24 #include <packager/media/formats/mp2t/ts_packet.h>
25 #include <packager/media/formats/mp2t/ts_section.h>
26 #include <packager/media/formats/mp2t/ts_section_pat.h>
27 #include <packager/media/formats/mp2t/ts_section_pes.h>
28 #include <packager/media/formats/mp2t/ts_section_pmt.h>
29 #include <packager/media/formats/mp2t/ts_stream_type.h>
30 
31 namespace shaka {
32 namespace media {
33 namespace mp2t {
34 
35 class PidState {
36  public:
37  enum PidType {
38  kPidPat,
39  kPidPmt,
40  kPidAudioPes,
41  kPidVideoPes,
42  kPidTextPes,
43  };
44 
45  PidState(int pid,
46  PidType pid_type,
47  std::unique_ptr<TsSection> section_parser);
48 
49  // Extract the content of the TS packet and parse it.
50  // Return true if successful.
51  bool PushTsPacket(const TsPacket& ts_packet);
52 
53  // Flush the PID state (possibly emitting some pending frames)
54  // and reset its state.
55  bool Flush();
56 
57  // Enable/disable the PID.
58  // Disabling a PID will reset its state and ignore any further incoming TS
59  // packets.
60  void Enable();
61  void Disable();
62  bool IsEnabled() const;
63 
64  PidType pid_type() const { return pid_type_; }
65 
66  std::shared_ptr<StreamInfo>& config() { return config_; }
67  void set_config(const std::shared_ptr<StreamInfo>& config) {
68  config_ = config;
69  }
70 
71  private:
72  friend Mp2tMediaParser;
73  void ResetState();
74 
75  int pid_;
76  PidType pid_type_;
77  std::unique_ptr<TsSection> section_parser_;
78 
79  std::deque<std::shared_ptr<MediaSample>> media_sample_queue_;
80  std::deque<std::shared_ptr<TextSample>> text_sample_queue_;
81 
82  bool enable_;
83  int continuity_counter_;
84  std::shared_ptr<StreamInfo> config_;
85 };
86 
87 PidState::PidState(int pid,
88  PidType pid_type,
89  std::unique_ptr<TsSection> section_parser)
90  : pid_(pid),
91  pid_type_(pid_type),
92  section_parser_(std::move(section_parser)),
93  enable_(false),
94  continuity_counter_(-1) {
95  DCHECK(section_parser_);
96 }
97 
98 bool PidState::PushTsPacket(const TsPacket& ts_packet) {
99  DCHECK_EQ(ts_packet.pid(), pid_);
100 
101  // The current PID is not part of the PID filter,
102  // just discard the incoming TS packet.
103  if (!enable_)
104  return true;
105  // TODO(bzd): continuity_counter_ is never set
106  int expected_continuity_counter = (continuity_counter_ + 1) % 16;
107  if (continuity_counter_ >= 0 &&
108  ts_packet.continuity_counter() != expected_continuity_counter) {
109  LOG(ERROR) << "TS discontinuity detected for pid: " << pid_;
110  // TODO(tinskip): Handle discontinuity better.
111  return false;
112  }
113 
114  bool status = section_parser_->Parse(
115  ts_packet.payload_unit_start_indicator(),
116  ts_packet.payload(),
117  ts_packet.payload_size());
118 
119  // At the minimum, when parsing failed, auto reset the section parser.
120  // Components that use the Mp2tMediaParser can take further action if needed.
121  if (!status) {
122  LOG(ERROR) << "Parsing failed for pid = " << pid_ << ", type=" << pid_type_;
123  ResetState();
124  }
125 
126  return status;
127 }
128 
129 bool PidState::Flush() {
130  RCHECK(section_parser_->Flush());
131  ResetState();
132  return true;
133 }
134 
135 void PidState::Enable() {
136  enable_ = true;
137 }
138 
139 void PidState::Disable() {
140  if (!enable_)
141  return;
142 
143  ResetState();
144  enable_ = false;
145 }
146 
147 bool PidState::IsEnabled() const {
148  return enable_;
149 }
150 
151 void PidState::ResetState() {
152  section_parser_->Reset();
153  continuity_counter_ = -1;
154 }
155 
156 Mp2tMediaParser::Mp2tMediaParser()
157  : sbr_in_mimetype_(false),
158  is_initialized_(false) {
159 }
160 
161 Mp2tMediaParser::~Mp2tMediaParser() {}
162 
163 void Mp2tMediaParser::Init(const InitCB& init_cb,
164  const NewMediaSampleCB& new_media_sample_cb,
165  const NewTextSampleCB& new_text_sample_cb,
166  KeySource* decryption_key_source) {
167  DCHECK(!is_initialized_);
168  DCHECK(init_cb_ == nullptr);
169  DCHECK(init_cb != nullptr);
170  DCHECK(new_media_sample_cb != nullptr);
171  DCHECK(new_text_sample_cb != nullptr);
172 
173  init_cb_ = init_cb;
174  new_media_sample_cb_ = new_media_sample_cb;
175  new_text_sample_cb_ = new_text_sample_cb;
176 }
177 
178 bool Mp2tMediaParser::Flush() {
179  DVLOG(1) << "Mp2tMediaParser::Flush";
180 
181  // Flush the buffers and reset the pids.
182  for (const auto& pair : pids_) {
183  DVLOG(1) << "Flushing PID: " << pair.first;
184  PidState* pid_state = pair.second.get();
185  RCHECK(pid_state->Flush());
186  }
187  bool result = EmitRemainingSamples();
188  pids_.clear();
189 
190  // Remove any bytes left in the TS buffer.
191  // (i.e. any partial TS packet => less than 188 bytes).
192  ts_byte_queue_.Reset();
193  return result;
194 }
195 
196 bool Mp2tMediaParser::Parse(const uint8_t* buf, int size) {
197  DVLOG(2) << "Mp2tMediaParser::Parse size=" << size;
198 
199  // Add the data to the parser state.
200  ts_byte_queue_.Push(buf, size);
201 
202  while (true) {
203  const uint8_t* ts_buffer;
204  int ts_buffer_size;
205  ts_byte_queue_.Peek(&ts_buffer, &ts_buffer_size);
206  if (ts_buffer_size < TsPacket::kPacketSize)
207  break;
208 
209  // Synchronization.
210  int skipped_bytes = TsPacket::Sync(ts_buffer, ts_buffer_size);
211  if (skipped_bytes > 0) {
212  DVLOG(1) << "Packet not aligned on a TS syncword:"
213  << " skipped_bytes=" << skipped_bytes;
214  ts_byte_queue_.Pop(skipped_bytes);
215  continue;
216  }
217 
218  // Parse the TS header, skipping 1 byte if the header is invalid.
219  std::unique_ptr<TsPacket> ts_packet(
220  TsPacket::Parse(ts_buffer, ts_buffer_size));
221  if (!ts_packet) {
222  DVLOG(1) << "Error: invalid TS packet";
223  ts_byte_queue_.Pop(1);
224  continue;
225  }
226  DVLOG(LOG_LEVEL_TS) << "Processing PID=" << ts_packet->pid()
227  << " start_unit="
228  << ts_packet->payload_unit_start_indicator()
229  << " continuity_counter="
230  << ts_packet->continuity_counter();
231  // Parse the section.
232  auto it = pids_.find(ts_packet->pid());
233  if (it == pids_.end() &&
234  ts_packet->pid() == TsSection::kPidPat) {
235  // Create the PAT state here if needed.
236  std::unique_ptr<TsSection> pat_section_parser(new TsSectionPat(
237  std::bind(&Mp2tMediaParser::RegisterPmt, this, std::placeholders::_1,
238  std::placeholders::_2)));
239  std::unique_ptr<PidState> pat_pid_state(new PidState(
240  ts_packet->pid(), PidState::kPidPat, std::move(pat_section_parser)));
241  pat_pid_state->Enable();
242  it = pids_.emplace(ts_packet->pid(), std::move(pat_pid_state)).first;
243  }
244 
245  if (it != pids_.end()) {
246  RCHECK(it->second->PushTsPacket(*ts_packet));
247  } else {
248  DVLOG(LOG_LEVEL_TS) << "Ignoring TS packet for pid: " << ts_packet->pid();
249  }
250 
251  // Go to the next packet.
252  ts_byte_queue_.Pop(TsPacket::kPacketSize);
253  }
254 
255  // Emit the A/V buffers that kept accumulating during TS parsing.
256  return EmitRemainingSamples();
257 }
258 
259 void Mp2tMediaParser::RegisterPmt(int program_number, int pmt_pid) {
260  DVLOG(1) << "RegisterPmt:"
261  << " program_number=" << program_number
262  << " pmt_pid=" << pmt_pid;
263 
264  // Only one TS program is allowed. Ignore the incoming program map table,
265  // if there is already one registered.
266  for (const auto& pair : pids_) {
267  if (pair.second->pid_type() == PidState::kPidPmt) {
268  if (pmt_pid != pair.first) {
269  DVLOG(1) << "More than one program is defined";
270  }
271  return;
272  }
273  }
274 
275  // Create the PMT state here if needed.
276  DVLOG(1) << "Create a new PMT parser";
277  std::unique_ptr<TsSection> pmt_section_parser(new TsSectionPmt(std::bind(
278  &Mp2tMediaParser::RegisterPes, this, pmt_pid, std::placeholders::_1,
279  std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
280  std::placeholders::_5, std::placeholders::_6, std::placeholders::_7)));
281  std::unique_ptr<PidState> pmt_pid_state(
282  new PidState(pmt_pid, PidState::kPidPmt, std::move(pmt_section_parser)));
283  pmt_pid_state->Enable();
284  pids_.emplace(pmt_pid, std::move(pmt_pid_state));
285 }
286 
287 void Mp2tMediaParser::RegisterPes(int pmt_pid,
288  int pes_pid,
289  TsStreamType stream_type,
290  uint32_t max_bitrate,
291  const std::string& lang,
292  TsAudioType audio_type,
293  const uint8_t* descriptor,
294  size_t descriptor_length) {
295  if (pids_.count(pes_pid) != 0)
296  return;
297  DVLOG(1) << "RegisterPes:"
298  << " pes_pid=" << pes_pid << " stream_type=" << std::hex
299  << static_cast<int>(stream_type) << std::dec
300  << "max_bitrate=" << max_bitrate << " lang=" << lang
301  << "audio_type=" << std::hex << static_cast<int>(audio_type)
302  << std::dec;
303 
304  // Create a stream parser corresponding to the stream type.
305  PidState::PidType pid_type = PidState::kPidVideoPes;
306  std::unique_ptr<EsParser> es_parser;
307  auto on_new_stream = std::bind(&Mp2tMediaParser::OnNewStreamInfo, this,
308  pes_pid, std::placeholders::_1);
309  auto on_emit_media = std::bind(&Mp2tMediaParser::OnEmitMediaSample, this,
310  pes_pid, std::placeholders::_1);
311  auto on_emit_text = std::bind(&Mp2tMediaParser::OnEmitTextSample, this,
312  pes_pid, std::placeholders::_1);
313  switch (stream_type) {
314  case TsStreamType::kAvc:
315  es_parser.reset(new EsParserH264(pes_pid, on_new_stream, on_emit_media));
316  break;
317  case TsStreamType::kHevc:
318  es_parser.reset(new EsParserH265(pes_pid, on_new_stream, on_emit_media));
319  break;
320  case TsStreamType::kAdtsAac:
321  case TsStreamType::kMpeg1Audio:
322  case TsStreamType::kAc3:
323  es_parser.reset(
324  new EsParserAudio(pes_pid, static_cast<TsStreamType>(stream_type),
325  on_new_stream, on_emit_media, sbr_in_mimetype_));
326  pid_type = PidState::kPidAudioPes;
327  break;
328  case TsStreamType::kDvbSubtitles:
329  es_parser.reset(new EsParserDvb(pes_pid, on_new_stream, on_emit_text,
330  descriptor, descriptor_length));
331  pid_type = PidState::kPidTextPes;
332  break;
333  case TsStreamType::kTeletextSubtitles:
334  es_parser.reset(new EsParserTeletext(pes_pid, on_new_stream, on_emit_text,
335  descriptor, descriptor_length));
336  pid_type = PidState::kPidTextPes;
337  break;
338 
339  default: {
340  auto type = static_cast<int>(stream_type);
341  DCHECK(type <= 0xff);
342  LOG_IF(ERROR, !stream_type_logged_once_[type])
343  << "Ignore unsupported MPEG2TS stream type 0x" << std::hex << type
344  << std::dec;
345  stream_type_logged_once_[type] = true;
346  return;
347  }
348  }
349 
350  // Create the PES state here.
351  DVLOG(1) << "Create a new PES state";
352  std::unique_ptr<TsSection> pes_section_parser(
353  new TsSectionPes(std::move(es_parser)));
354  std::unique_ptr<PidState> pes_pid_state(
355  new PidState(pes_pid, pid_type, std::move(pes_section_parser)));
356  pes_pid_state->Enable();
357  pids_.emplace(pes_pid, std::move(pes_pid_state));
358 
359  // Store PES metadata.
360  pes_metadata_.insert(
361  std::make_pair(pes_pid, PesMetadata{max_bitrate, lang, audio_type}));
362 }
363 
364 void Mp2tMediaParser::OnNewStreamInfo(
365  uint32_t pes_pid,
366  std::shared_ptr<StreamInfo> new_stream_info) {
367  DCHECK(!new_stream_info || new_stream_info->track_id() == pes_pid);
368  DVLOG(1) << "OnVideoConfigChanged for pid=" << pes_pid
369  << ", has_info=" << (new_stream_info ? "true" : "false");
370 
371  auto pid_state = pids_.find(pes_pid);
372  if (pid_state == pids_.end()) {
373  LOG(ERROR) << "PID State for new stream not found (pid = "
374  << new_stream_info->track_id() << ").";
375  return;
376  }
377 
378  if (new_stream_info) {
379  // Set the stream configuration information for the PID.
380  auto pes_metadata = pes_metadata_.find(pes_pid);
381  DCHECK(pes_metadata != pes_metadata_.end());
382  if (!pes_metadata->second.language.empty())
383  new_stream_info->set_language(pes_metadata->second.language);
384  if (new_stream_info->stream_type() == kStreamAudio) {
385  auto* audio_info = static_cast<AudioStreamInfo*>(new_stream_info.get());
386  audio_info->set_max_bitrate(pes_metadata->second.max_bitrate);
387  // TODO(modernletter) Add some field for audio type to AudioStreamInfo
388  // and set here from audio_type
389  }
390 
391  pid_state->second->set_config(new_stream_info);
392  } else {
393  LOG(WARNING) << "Ignoring unsupported stream with pid=" << pes_pid;
394  pid_state->second->Disable();
395  }
396 
397  // Finish initialization if all streams have configs.
398  FinishInitializationIfNeeded();
399 }
400 
401 bool Mp2tMediaParser::FinishInitializationIfNeeded() {
402  // Nothing to be done if already initialized.
403  if (is_initialized_)
404  return true;
405 
406  // Wait for more data to come to finish initialization.
407  if (pids_.empty())
408  return true;
409 
410  std::vector<std::shared_ptr<StreamInfo>> all_stream_info;
411  uint32_t num_es(0);
412  for (const auto& pair : pids_) {
413  if ((pair.second->pid_type() == PidState::kPidAudioPes ||
414  pair.second->pid_type() == PidState::kPidVideoPes ||
415  pair.second->pid_type() == PidState::kPidTextPes) &&
416  pair.second->IsEnabled()) {
417  ++num_es;
418  if (pair.second->config())
419  all_stream_info.push_back(pair.second->config());
420  }
421  }
422  if (num_es && (all_stream_info.size() == num_es)) {
423  // All stream configurations have been received. Initialization can
424  // be completed.
425  init_cb_(all_stream_info);
426  DVLOG(1) << "Mpeg2TS stream parser initialization done";
427  is_initialized_ = true;
428  }
429  return true;
430 }
431 
432 void Mp2tMediaParser::OnEmitMediaSample(
433  uint32_t pes_pid,
434  std::shared_ptr<MediaSample> new_sample) {
435  DCHECK(new_sample);
436  DVLOG(LOG_LEVEL_ES) << "OnEmitMediaSample: "
437  << " pid=" << pes_pid
438  << " size=" << new_sample->data_size()
439  << " dts=" << new_sample->dts()
440  << " pts=" << new_sample->pts();
441 
442  // Add the sample to the appropriate PID sample queue.
443  auto pid_state = pids_.find(pes_pid);
444  if (pid_state == pids_.end()) {
445  LOG(ERROR) << "PID State for new sample not found (pid = " << pes_pid
446  << ").";
447  return;
448  }
449  pid_state->second->media_sample_queue_.push_back(std::move(new_sample));
450 }
451 
452 void Mp2tMediaParser::OnEmitTextSample(uint32_t pes_pid,
453  std::shared_ptr<TextSample> new_sample) {
454  DCHECK(new_sample);
455  DVLOG(LOG_LEVEL_ES) << "OnEmitTextSample: "
456  << " pid=" << pes_pid
457  << " start=" << new_sample->start_time();
458 
459  // Add the sample to the appropriate PID sample queue.
460  auto pid_state = pids_.find(pes_pid);
461  if (pid_state == pids_.end()) {
462  LOG(ERROR) << "PID State for new sample not found (pid = "
463  << pes_pid << ").";
464  return;
465  }
466  pid_state->second->text_sample_queue_.push_back(std::move(new_sample));
467 }
468 
469 bool Mp2tMediaParser::EmitRemainingSamples() {
470  DVLOG(LOG_LEVEL_ES) << "Mp2tMediaParser::EmitRemainingBuffers";
471 
472  // No buffer should be sent until fully initialized.
473  if (!is_initialized_)
474  return true;
475 
476  // Buffer emission.
477  for (const auto& pid_pair : pids_) {
478  for (auto sample : pid_pair.second->media_sample_queue_) {
479  RCHECK(new_media_sample_cb_(pid_pair.first, sample));
480  }
481  pid_pair.second->media_sample_queue_.clear();
482 
483  for (auto sample : pid_pair.second->text_sample_queue_) {
484  RCHECK(new_text_sample_cb_(pid_pair.first, sample));
485  }
486  pid_pair.second->text_sample_queue_.clear();
487  }
488 
489  return true;
490 }
491 
492 } // namespace mp2t
493 } // namespace media
494 } // namespace shaka
KeySource is responsible for encryption key acquisition.
Definition: key_source.h:52
std::function< bool(uint32_t track_id, std::shared_ptr< MediaSample > media_sample)> NewMediaSampleCB
Definition: media_parser.h:45
std::function< bool(uint32_t track_id, std::shared_ptr< TextSample > text_sample)> NewTextSampleCB
Definition: media_parser.h:54
std::function< void(const std::vector< std::shared_ptr< StreamInfo > > &stream_info)> InitCB
Definition: media_parser.h:36
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66