5#include <packager/media/formats/mp2t/mp2t_media_parser.h>
10#include <absl/log/check.h>
12#include <packager/macros/logging.h>
13#include <packager/media/base/media_sample.h>
14#include <packager/media/base/stream_info.h>
15#include <packager/media/base/text_sample.h>
16#include <packager/media/formats/mp2t/es_parser.h>
17#include <packager/media/formats/mp2t/es_parser_audio.h>
18#include <packager/media/formats/mp2t/es_parser_dvb.h>
19#include <packager/media/formats/mp2t/es_parser_h264.h>
20#include <packager/media/formats/mp2t/es_parser_h265.h>
21#include <packager/media/formats/mp2t/es_parser_teletext.h>
22#include <packager/media/formats/mp2t/mp2t_common.h>
23#include <packager/media/formats/mp2t/ts_audio_type.h>
24#include <packager/media/formats/mp2t/ts_packet.h>
25#include <packager/media/formats/mp2t/ts_section.h>
26#include <packager/media/formats/mp2t/ts_section_pat.h>
27#include <packager/media/formats/mp2t/ts_section_pes.h>
28#include <packager/media/formats/mp2t/ts_section_pmt.h>
29#include <packager/media/formats/mp2t/ts_stream_type.h>
47 std::unique_ptr<TsSection> section_parser);
51 bool PushTsPacket(
const TsPacket& ts_packet);
62 bool IsEnabled()
const;
64 PidType pid_type()
const {
return pid_type_; }
66 std::shared_ptr<StreamInfo>& config() {
return config_; }
67 void set_config(
const std::shared_ptr<StreamInfo>& config) {
72 friend Mp2tMediaParser;
77 std::unique_ptr<TsSection> section_parser_;
79 std::deque<std::shared_ptr<MediaSample>> media_sample_queue_;
80 std::deque<std::shared_ptr<TextSample>> text_sample_queue_;
83 int continuity_counter_;
84 std::shared_ptr<StreamInfo> config_;
87PidState::PidState(
int pid,
89 std::unique_ptr<TsSection> section_parser)
92 section_parser_(std::move(section_parser)),
94 continuity_counter_(-1) {
95 DCHECK(section_parser_);
98bool PidState::PushTsPacket(
const TsPacket& ts_packet) {
99 DCHECK_EQ(ts_packet.pid(), pid_);
106 int expected_continuity_counter = (continuity_counter_ + 1) % 16;
107 if (continuity_counter_ >= 0 &&
108 ts_packet.continuity_counter() != expected_continuity_counter) {
109 LOG(ERROR) <<
"TS discontinuity detected for pid: " << pid_;
115 section_parser_->Parse(ts_packet.payload_unit_start_indicator(),
116 ts_packet.payload(), ts_packet.payload_size());
121 LOG(ERROR) <<
"Parsing failed for pid = " << pid_ <<
", type=" << pid_type_;
128bool PidState::Flush() {
129 RCHECK(section_parser_->Flush());
134void PidState::Enable() {
138void PidState::Disable() {
146bool PidState::IsEnabled()
const {
150void PidState::ResetState() {
151 section_parser_->Reset();
152 continuity_counter_ = -1;
155Mp2tMediaParser::Mp2tMediaParser()
156 : sbr_in_mimetype_(false), is_initialized_(false) {}
158Mp2tMediaParser::~Mp2tMediaParser() {}
160void Mp2tMediaParser::Init(
const InitCB& init_cb,
164 DCHECK(!is_initialized_);
165 DCHECK(init_cb_ ==
nullptr);
166 DCHECK(init_cb !=
nullptr);
167 DCHECK(new_media_sample_cb !=
nullptr);
168 DCHECK(new_text_sample_cb !=
nullptr);
171 new_media_sample_cb_ = new_media_sample_cb;
172 new_text_sample_cb_ = new_text_sample_cb;
175bool Mp2tMediaParser::Flush() {
176 DVLOG(1) <<
"Mp2tMediaParser::Flush";
179 for (
const auto& pair : pids_) {
180 DVLOG(1) <<
"Flushing PID: " << pair.first;
181 PidState* pid_state = pair.second.get();
182 RCHECK(pid_state->Flush());
184 bool result = EmitRemainingSamples();
189 ts_byte_queue_.Reset();
193bool Mp2tMediaParser::Parse(
const uint8_t* buf,
int size) {
194 DVLOG(2) <<
"Mp2tMediaParser::Parse size=" << size;
197 ts_byte_queue_.Push(buf, size);
200 const uint8_t* ts_buffer;
202 ts_byte_queue_.Peek(&ts_buffer, &ts_buffer_size);
203 if (ts_buffer_size < TsPacket::kPacketSize)
207 int skipped_bytes = TsPacket::Sync(ts_buffer, ts_buffer_size);
208 if (skipped_bytes > 0) {
209 DVLOG(1) <<
"Packet not aligned on a TS syncword:"
210 <<
" skipped_bytes=" << skipped_bytes;
211 ts_byte_queue_.Pop(skipped_bytes);
216 std::unique_ptr<TsPacket> ts_packet(
217 TsPacket::Parse(ts_buffer, ts_buffer_size));
219 DVLOG(1) <<
"Error: invalid TS packet";
220 ts_byte_queue_.Pop(1);
223 DVLOG(LOG_LEVEL_TS) <<
"Processing PID=" << ts_packet->pid()
225 << ts_packet->payload_unit_start_indicator()
226 <<
" continuity_counter="
227 << ts_packet->continuity_counter();
229 auto it = pids_.find(ts_packet->pid());
230 if (it == pids_.end() && ts_packet->pid() == TsSection::kPidPat) {
232 std::unique_ptr<TsSection> pat_section_parser(
new TsSectionPat(
233 std::bind(&Mp2tMediaParser::RegisterPmt,
this, std::placeholders::_1,
234 std::placeholders::_2)));
235 std::unique_ptr<PidState> pat_pid_state(
new PidState(
236 ts_packet->pid(), PidState::kPidPat, std::move(pat_section_parser)));
237 pat_pid_state->Enable();
238 it = pids_.emplace(ts_packet->pid(), std::move(pat_pid_state)).first;
241 if (it != pids_.end()) {
242 RCHECK(it->second->PushTsPacket(*ts_packet));
244 DVLOG(LOG_LEVEL_TS) <<
"Ignoring TS packet for pid: " << ts_packet->pid();
248 ts_byte_queue_.Pop(TsPacket::kPacketSize);
252 return EmitRemainingSamples();
255void Mp2tMediaParser::RegisterPmt(
int program_number,
int pmt_pid) {
256 DVLOG(1) <<
"RegisterPmt:"
257 <<
" program_number=" << program_number <<
" pmt_pid=" << pmt_pid;
261 for (
const auto& pair : pids_) {
262 if (pair.second->pid_type() == PidState::kPidPmt) {
263 if (pmt_pid != pair.first) {
264 DVLOG(1) <<
"More than one program is defined";
271 DVLOG(1) <<
"Create a new PMT parser";
272 std::unique_ptr<TsSection> pmt_section_parser(
new TsSectionPmt(std::bind(
273 &Mp2tMediaParser::RegisterPes,
this, pmt_pid, std::placeholders::_1,
274 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
275 std::placeholders::_5, std::placeholders::_6, std::placeholders::_7)));
276 std::unique_ptr<PidState> pmt_pid_state(
277 new PidState(pmt_pid, PidState::kPidPmt, std::move(pmt_section_parser)));
278 pmt_pid_state->Enable();
279 pids_.emplace(pmt_pid, std::move(pmt_pid_state));
282void Mp2tMediaParser::RegisterPes(
int pmt_pid,
284 TsStreamType stream_type,
285 uint32_t max_bitrate,
286 const std::string& lang,
287 TsAudioType audio_type,
288 const uint8_t* descriptor,
289 size_t descriptor_length) {
290 if (pids_.count(pes_pid) != 0)
292 DVLOG(1) <<
"RegisterPes:"
293 <<
" pes_pid=" << pes_pid <<
" stream_type=" << std::hex
294 <<
static_cast<int>(stream_type) << std::dec
295 <<
"max_bitrate=" << max_bitrate <<
" lang=" << lang
296 <<
"audio_type=" << std::hex <<
static_cast<int>(audio_type)
300 PidState::PidType pid_type = PidState::kPidVideoPes;
301 std::unique_ptr<EsParser> es_parser;
302 auto on_new_stream = std::bind(&Mp2tMediaParser::OnNewStreamInfo,
this,
303 pes_pid, std::placeholders::_1);
304 auto on_emit_media = std::bind(&Mp2tMediaParser::OnEmitMediaSample,
this,
305 pes_pid, std::placeholders::_1);
306 auto on_emit_text = std::bind(&Mp2tMediaParser::OnEmitTextSample,
this,
307 pes_pid, std::placeholders::_1);
308 switch (stream_type) {
309 case TsStreamType::kAvc:
310 es_parser.reset(
new EsParserH264(pes_pid, on_new_stream, on_emit_media));
312 case TsStreamType::kHevc:
313 es_parser.reset(
new EsParserH265(pes_pid, on_new_stream, on_emit_media));
315 case TsStreamType::kAdtsAac:
316 case TsStreamType::kMpeg1Audio:
317 case TsStreamType::kAc3:
319 new EsParserAudio(pes_pid,
static_cast<TsStreamType
>(stream_type),
320 on_new_stream, on_emit_media, sbr_in_mimetype_));
321 pid_type = PidState::kPidAudioPes;
323 case TsStreamType::kDvbSubtitles:
324 es_parser.reset(
new EsParserDvb(pes_pid, on_new_stream, on_emit_text,
325 descriptor, descriptor_length));
326 pid_type = PidState::kPidTextPes;
328 case TsStreamType::kTeletextSubtitles:
329 es_parser.reset(
new EsParserTeletext(pes_pid, on_new_stream, on_emit_text,
330 descriptor, descriptor_length));
331 pid_type = PidState::kPidTextPes;
335 auto type =
static_cast<int>(stream_type);
336 DCHECK(type <= 0xff);
337 LOG_IF(ERROR, !stream_type_logged_once_[type])
338 <<
"Ignore unsupported MPEG2TS stream type 0x" << std::hex << type
340 stream_type_logged_once_[type] =
true;
346 DVLOG(1) <<
"Create a new PES state";
347 std::unique_ptr<TsSection> pes_section_parser(
348 new TsSectionPes(std::move(es_parser)));
349 std::unique_ptr<PidState> pes_pid_state(
350 new PidState(pes_pid, pid_type, std::move(pes_section_parser)));
351 pes_pid_state->Enable();
352 pids_.emplace(pes_pid, std::move(pes_pid_state));
355 pes_metadata_.insert(
356 std::make_pair(pes_pid, PesMetadata{max_bitrate, lang, audio_type}));
359 if (pid_type == PidState::kPidTextPes) {
360 text_pids_.insert(pes_pid);
364void Mp2tMediaParser::OnNewStreamInfo(
366 std::shared_ptr<StreamInfo> new_stream_info) {
367 DCHECK(!new_stream_info || new_stream_info->track_id() == pes_pid);
368 DVLOG(1) <<
"OnVideoConfigChanged for pid=" << pes_pid
369 <<
", has_info=" << (new_stream_info ?
"true" :
"false");
371 auto pid_state = pids_.find(pes_pid);
372 if (pid_state == pids_.end()) {
373 LOG(ERROR) <<
"PID State for new stream not found (pid = "
374 << new_stream_info->track_id() <<
").";
378 if (new_stream_info) {
380 auto pes_metadata = pes_metadata_.find(pes_pid);
381 DCHECK(pes_metadata != pes_metadata_.end());
382 if (!pes_metadata->second.language.empty())
383 new_stream_info->set_language(pes_metadata->second.language);
384 if (new_stream_info->stream_type() == kStreamAudio) {
385 auto* audio_info =
static_cast<AudioStreamInfo*
>(new_stream_info.get());
386 audio_info->set_max_bitrate(pes_metadata->second.max_bitrate);
391 pid_state->second->set_config(new_stream_info);
393 LOG(WARNING) <<
"Ignoring unsupported stream with pid=" << pes_pid;
394 pid_state->second->Disable();
398 FinishInitializationIfNeeded();
401bool Mp2tMediaParser::FinishInitializationIfNeeded() {
410 std::vector<std::shared_ptr<StreamInfo>> all_stream_info;
412 for (
const auto& pair : pids_) {
413 if ((pair.second->pid_type() == PidState::kPidAudioPes ||
414 pair.second->pid_type() == PidState::kPidVideoPes ||
415 pair.second->pid_type() == PidState::kPidTextPes) &&
416 pair.second->IsEnabled()) {
418 if (pair.second->config())
419 all_stream_info.push_back(pair.second->config());
422 if (num_es && (all_stream_info.size() == num_es)) {
425 init_cb_(all_stream_info);
426 DVLOG(1) <<
"Mpeg2TS stream parser initialization done";
427 is_initialized_ =
true;
432void Mp2tMediaParser::OnEmitMediaSample(
434 std::shared_ptr<MediaSample> new_sample) {
436 DVLOG(LOG_LEVEL_ES) <<
"OnEmitMediaSample: "
437 <<
" pid=" << pes_pid
438 <<
" size=" << new_sample->data_size()
439 <<
" dts=" << new_sample->dts()
440 <<
" pts=" << new_sample->pts();
443 auto pid_state = pids_.find(pes_pid);
444 if (pid_state == pids_.end()) {
445 LOG(ERROR) <<
"PID State for new sample not found (pid = " << pes_pid
452 int64_t timestamp_for_heartbeat = new_sample->pts();
453 if (pid_state->second->pid_type() == PidState::kPidVideoPes) {
456 timestamp_for_heartbeat = new_sample->dts();
457 if (timestamp_for_heartbeat == 0) {
458 timestamp_for_heartbeat = new_sample->pts();
463 update_biggest_pts(timestamp_for_heartbeat);
464 pid_state->second->media_sample_queue_.push_back(std::move(new_sample));
467void Mp2tMediaParser::OnEmitTextSample(uint32_t pes_pid,
468 std::shared_ptr<TextSample> new_sample) {
470 DVLOG(LOG_LEVEL_ES) <<
"OnEmitTextSample: "
471 <<
" pid=" << pes_pid
472 <<
" start=" << new_sample->start_time();
475 auto pid_state = pids_.find(pes_pid);
476 if (pid_state == pids_.end()) {
477 LOG(ERROR) <<
"PID State for new sample not found (pid = " << pes_pid
486 pid_state->second->text_sample_queue_.push_back(std::move(new_sample));
489bool Mp2tMediaParser::EmitRemainingSamples() {
490 DVLOG(LOG_LEVEL_ES) <<
"Mp2tMediaParser::EmitRemainingBuffers";
493 if (!is_initialized_)
497 for (
const auto& pid_pair : pids_) {
498 for (
auto sample : pid_pair.second->media_sample_queue_) {
499 RCHECK(new_media_sample_cb_(pid_pair.first, sample));
501 pid_pair.second->media_sample_queue_.clear();
503 DVLOG(2) <<
"EmitRemainingSamples: text_sample_queue_ size="
504 << pid_pair.second->text_sample_queue_.size();
505 for (
auto sample : pid_pair.second->text_sample_queue_) {
506 DVLOG(2) <<
"Emitting text sample: role="
507 <<
static_cast<int>(sample->role())
508 <<
" pts=" << sample->start_time()
509 <<
" is_empty=" << sample->is_empty();
510 bool result = new_text_sample_cb_(pid_pair.first, sample);
511 DVLOG(3) <<
"new_text_sample_cb_ returned: " << result;
514 pid_pair.second->text_sample_queue_.clear();
520void Mp2tMediaParser::update_biggest_pts(int64_t pts) {
521 if (pts >= biggest_pts_ + 9000) {
523 for (
auto pid : text_pids_) {
524 auto pid_state = pids_.find(pid);
525 if (pid_state == pids_.end()) {
526 LOG(ERROR) <<
"PID State for new sample not found (text pid = " << pid
530 TextSettings text_settings;
531 auto heartbeat = std::make_shared<TextSample>(
532 "", pts, pts, text_settings, TextFragment({},
""),
533 TextSampleRole::kMediaHeartBeat);
536 heartbeat->set_sub_stream_index(pid);
537 OnEmitTextSample(uint32_t(pid), heartbeat);
All the methods that are virtual are virtual for mocking.