5 #include <packager/media/formats/mp2t/mp2t_media_parser.h>
10 #include <absl/log/check.h>
12 #include <packager/macros/logging.h>
13 #include <packager/media/base/media_sample.h>
14 #include <packager/media/base/stream_info.h>
15 #include <packager/media/base/text_sample.h>
16 #include <packager/media/formats/mp2t/es_parser.h>
17 #include <packager/media/formats/mp2t/es_parser_audio.h>
18 #include <packager/media/formats/mp2t/es_parser_dvb.h>
19 #include <packager/media/formats/mp2t/es_parser_h264.h>
20 #include <packager/media/formats/mp2t/es_parser_h265.h>
21 #include <packager/media/formats/mp2t/es_parser_teletext.h>
22 #include <packager/media/formats/mp2t/mp2t_common.h>
23 #include <packager/media/formats/mp2t/ts_audio_type.h>
24 #include <packager/media/formats/mp2t/ts_packet.h>
25 #include <packager/media/formats/mp2t/ts_section.h>
26 #include <packager/media/formats/mp2t/ts_section_pat.h>
27 #include <packager/media/formats/mp2t/ts_section_pes.h>
28 #include <packager/media/formats/mp2t/ts_section_pmt.h>
29 #include <packager/media/formats/mp2t/ts_stream_type.h>
47 std::unique_ptr<TsSection> section_parser);
51 bool PushTsPacket(
const TsPacket& ts_packet);
62 bool IsEnabled()
const;
64 PidType pid_type()
const {
return pid_type_; }
66 std::shared_ptr<StreamInfo>& config() {
return config_; }
67 void set_config(
const std::shared_ptr<StreamInfo>& config) {
72 friend Mp2tMediaParser;
77 std::unique_ptr<TsSection> section_parser_;
79 std::deque<std::shared_ptr<MediaSample>> media_sample_queue_;
80 std::deque<std::shared_ptr<TextSample>> text_sample_queue_;
83 int continuity_counter_;
84 std::shared_ptr<StreamInfo> config_;
87 PidState::PidState(
int pid,
89 std::unique_ptr<TsSection> section_parser)
92 section_parser_(std::move(section_parser)),
94 continuity_counter_(-1) {
95 DCHECK(section_parser_);
98 bool PidState::PushTsPacket(
const TsPacket& ts_packet) {
99 DCHECK_EQ(ts_packet.pid(), pid_);
106 int expected_continuity_counter = (continuity_counter_ + 1) % 16;
107 if (continuity_counter_ >= 0 &&
108 ts_packet.continuity_counter() != expected_continuity_counter) {
109 LOG(ERROR) <<
"TS discontinuity detected for pid: " << pid_;
114 bool status = section_parser_->Parse(
115 ts_packet.payload_unit_start_indicator(),
117 ts_packet.payload_size());
122 LOG(ERROR) <<
"Parsing failed for pid = " << pid_ <<
", type=" << pid_type_;
129 bool PidState::Flush() {
130 RCHECK(section_parser_->Flush());
135 void PidState::Enable() {
139 void PidState::Disable() {
147 bool PidState::IsEnabled()
const {
151 void PidState::ResetState() {
152 section_parser_->Reset();
153 continuity_counter_ = -1;
156 Mp2tMediaParser::Mp2tMediaParser()
157 : sbr_in_mimetype_(false),
158 is_initialized_(false) {
161 Mp2tMediaParser::~Mp2tMediaParser() {}
163 void Mp2tMediaParser::Init(
const InitCB& init_cb,
167 DCHECK(!is_initialized_);
168 DCHECK(init_cb_ ==
nullptr);
169 DCHECK(init_cb !=
nullptr);
170 DCHECK(new_media_sample_cb !=
nullptr);
171 DCHECK(new_text_sample_cb !=
nullptr);
174 new_media_sample_cb_ = new_media_sample_cb;
175 new_text_sample_cb_ = new_text_sample_cb;
178 bool Mp2tMediaParser::Flush() {
179 DVLOG(1) <<
"Mp2tMediaParser::Flush";
182 for (
const auto& pair : pids_) {
183 DVLOG(1) <<
"Flushing PID: " << pair.first;
184 PidState* pid_state = pair.second.get();
185 RCHECK(pid_state->Flush());
187 bool result = EmitRemainingSamples();
192 ts_byte_queue_.Reset();
196 bool Mp2tMediaParser::Parse(
const uint8_t* buf,
int size) {
197 DVLOG(2) <<
"Mp2tMediaParser::Parse size=" << size;
200 ts_byte_queue_.Push(buf, size);
203 const uint8_t* ts_buffer;
205 ts_byte_queue_.Peek(&ts_buffer, &ts_buffer_size);
206 if (ts_buffer_size < TsPacket::kPacketSize)
210 int skipped_bytes = TsPacket::Sync(ts_buffer, ts_buffer_size);
211 if (skipped_bytes > 0) {
212 DVLOG(1) <<
"Packet not aligned on a TS syncword:"
213 <<
" skipped_bytes=" << skipped_bytes;
214 ts_byte_queue_.Pop(skipped_bytes);
219 std::unique_ptr<TsPacket> ts_packet(
220 TsPacket::Parse(ts_buffer, ts_buffer_size));
222 DVLOG(1) <<
"Error: invalid TS packet";
223 ts_byte_queue_.Pop(1);
226 DVLOG(LOG_LEVEL_TS) <<
"Processing PID=" << ts_packet->pid()
228 << ts_packet->payload_unit_start_indicator()
229 <<
" continuity_counter="
230 << ts_packet->continuity_counter();
232 auto it = pids_.find(ts_packet->pid());
233 if (it == pids_.end() &&
234 ts_packet->pid() == TsSection::kPidPat) {
236 std::unique_ptr<TsSection> pat_section_parser(
new TsSectionPat(
237 std::bind(&Mp2tMediaParser::RegisterPmt,
this, std::placeholders::_1,
238 std::placeholders::_2)));
239 std::unique_ptr<PidState> pat_pid_state(
new PidState(
240 ts_packet->pid(), PidState::kPidPat, std::move(pat_section_parser)));
241 pat_pid_state->Enable();
242 it = pids_.emplace(ts_packet->pid(), std::move(pat_pid_state)).first;
245 if (it != pids_.end()) {
246 RCHECK(it->second->PushTsPacket(*ts_packet));
248 DVLOG(LOG_LEVEL_TS) <<
"Ignoring TS packet for pid: " << ts_packet->pid();
252 ts_byte_queue_.Pop(TsPacket::kPacketSize);
256 return EmitRemainingSamples();
259 void Mp2tMediaParser::RegisterPmt(
int program_number,
int pmt_pid) {
260 DVLOG(1) <<
"RegisterPmt:"
261 <<
" program_number=" << program_number
262 <<
" pmt_pid=" << pmt_pid;
266 for (
const auto& pair : pids_) {
267 if (pair.second->pid_type() == PidState::kPidPmt) {
268 if (pmt_pid != pair.first) {
269 DVLOG(1) <<
"More than one program is defined";
276 DVLOG(1) <<
"Create a new PMT parser";
277 std::unique_ptr<TsSection> pmt_section_parser(
new TsSectionPmt(std::bind(
278 &Mp2tMediaParser::RegisterPes,
this, pmt_pid, std::placeholders::_1,
279 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
280 std::placeholders::_5, std::placeholders::_6, std::placeholders::_7)));
281 std::unique_ptr<PidState> pmt_pid_state(
282 new PidState(pmt_pid, PidState::kPidPmt, std::move(pmt_section_parser)));
283 pmt_pid_state->Enable();
284 pids_.emplace(pmt_pid, std::move(pmt_pid_state));
287 void Mp2tMediaParser::RegisterPes(
int pmt_pid,
289 TsStreamType stream_type,
290 uint32_t max_bitrate,
291 const std::string& lang,
292 TsAudioType audio_type,
293 const uint8_t* descriptor,
294 size_t descriptor_length) {
295 if (pids_.count(pes_pid) != 0)
297 DVLOG(1) <<
"RegisterPes:"
298 <<
" pes_pid=" << pes_pid <<
" stream_type=" << std::hex
299 <<
static_cast<int>(stream_type) << std::dec
300 <<
"max_bitrate=" << max_bitrate <<
" lang=" << lang
301 <<
"audio_type=" << std::hex <<
static_cast<int>(audio_type)
305 PidState::PidType pid_type = PidState::kPidVideoPes;
306 std::unique_ptr<EsParser> es_parser;
307 auto on_new_stream = std::bind(&Mp2tMediaParser::OnNewStreamInfo,
this,
308 pes_pid, std::placeholders::_1);
309 auto on_emit_media = std::bind(&Mp2tMediaParser::OnEmitMediaSample,
this,
310 pes_pid, std::placeholders::_1);
311 auto on_emit_text = std::bind(&Mp2tMediaParser::OnEmitTextSample,
this,
312 pes_pid, std::placeholders::_1);
313 switch (stream_type) {
314 case TsStreamType::kAvc:
315 es_parser.reset(
new EsParserH264(pes_pid, on_new_stream, on_emit_media));
317 case TsStreamType::kHevc:
318 es_parser.reset(
new EsParserH265(pes_pid, on_new_stream, on_emit_media));
320 case TsStreamType::kAdtsAac:
321 case TsStreamType::kMpeg1Audio:
322 case TsStreamType::kAc3:
324 new EsParserAudio(pes_pid,
static_cast<TsStreamType
>(stream_type),
325 on_new_stream, on_emit_media, sbr_in_mimetype_));
326 pid_type = PidState::kPidAudioPes;
328 case TsStreamType::kDvbSubtitles:
329 es_parser.reset(
new EsParserDvb(pes_pid, on_new_stream, on_emit_text,
330 descriptor, descriptor_length));
331 pid_type = PidState::kPidTextPes;
333 case TsStreamType::kTeletextSubtitles:
334 es_parser.reset(
new EsParserTeletext(pes_pid, on_new_stream, on_emit_text,
335 descriptor, descriptor_length));
336 pid_type = PidState::kPidTextPes;
340 auto type =
static_cast<int>(stream_type);
341 DCHECK(type <= 0xff);
342 LOG_IF(ERROR, !stream_type_logged_once_[type])
343 <<
"Ignore unsupported MPEG2TS stream type 0x" << std::hex << type
345 stream_type_logged_once_[type] =
true;
351 DVLOG(1) <<
"Create a new PES state";
352 std::unique_ptr<TsSection> pes_section_parser(
353 new TsSectionPes(std::move(es_parser)));
354 std::unique_ptr<PidState> pes_pid_state(
355 new PidState(pes_pid, pid_type, std::move(pes_section_parser)));
356 pes_pid_state->Enable();
357 pids_.emplace(pes_pid, std::move(pes_pid_state));
360 pes_metadata_.insert(
361 std::make_pair(pes_pid, PesMetadata{max_bitrate, lang, audio_type}));
364 void Mp2tMediaParser::OnNewStreamInfo(
366 std::shared_ptr<StreamInfo> new_stream_info) {
367 DCHECK(!new_stream_info || new_stream_info->track_id() == pes_pid);
368 DVLOG(1) <<
"OnVideoConfigChanged for pid=" << pes_pid
369 <<
", has_info=" << (new_stream_info ?
"true" :
"false");
371 auto pid_state = pids_.find(pes_pid);
372 if (pid_state == pids_.end()) {
373 LOG(ERROR) <<
"PID State for new stream not found (pid = "
374 << new_stream_info->track_id() <<
").";
378 if (new_stream_info) {
380 auto pes_metadata = pes_metadata_.find(pes_pid);
381 DCHECK(pes_metadata != pes_metadata_.end());
382 if (!pes_metadata->second.language.empty())
383 new_stream_info->set_language(pes_metadata->second.language);
384 if (new_stream_info->stream_type() == kStreamAudio) {
385 auto* audio_info =
static_cast<AudioStreamInfo*
>(new_stream_info.get());
386 audio_info->set_max_bitrate(pes_metadata->second.max_bitrate);
391 pid_state->second->set_config(new_stream_info);
393 LOG(WARNING) <<
"Ignoring unsupported stream with pid=" << pes_pid;
394 pid_state->second->Disable();
398 FinishInitializationIfNeeded();
401 bool Mp2tMediaParser::FinishInitializationIfNeeded() {
410 std::vector<std::shared_ptr<StreamInfo>> all_stream_info;
412 for (
const auto& pair : pids_) {
413 if ((pair.second->pid_type() == PidState::kPidAudioPes ||
414 pair.second->pid_type() == PidState::kPidVideoPes ||
415 pair.second->pid_type() == PidState::kPidTextPes) &&
416 pair.second->IsEnabled()) {
418 if (pair.second->config())
419 all_stream_info.push_back(pair.second->config());
422 if (num_es && (all_stream_info.size() == num_es)) {
425 init_cb_(all_stream_info);
426 DVLOG(1) <<
"Mpeg2TS stream parser initialization done";
427 is_initialized_ =
true;
432 void Mp2tMediaParser::OnEmitMediaSample(
434 std::shared_ptr<MediaSample> new_sample) {
436 DVLOG(LOG_LEVEL_ES) <<
"OnEmitMediaSample: "
437 <<
" pid=" << pes_pid
438 <<
" size=" << new_sample->data_size()
439 <<
" dts=" << new_sample->dts()
440 <<
" pts=" << new_sample->pts();
443 auto pid_state = pids_.find(pes_pid);
444 if (pid_state == pids_.end()) {
445 LOG(ERROR) <<
"PID State for new sample not found (pid = " << pes_pid
449 pid_state->second->media_sample_queue_.push_back(std::move(new_sample));
452 void Mp2tMediaParser::OnEmitTextSample(uint32_t pes_pid,
453 std::shared_ptr<TextSample> new_sample) {
455 DVLOG(LOG_LEVEL_ES) <<
"OnEmitTextSample: "
456 <<
" pid=" << pes_pid
457 <<
" start=" << new_sample->start_time();
460 auto pid_state = pids_.find(pes_pid);
461 if (pid_state == pids_.end()) {
462 LOG(ERROR) <<
"PID State for new sample not found (pid = "
466 pid_state->second->text_sample_queue_.push_back(std::move(new_sample));
469 bool Mp2tMediaParser::EmitRemainingSamples() {
470 DVLOG(LOG_LEVEL_ES) <<
"Mp2tMediaParser::EmitRemainingBuffers";
473 if (!is_initialized_)
477 for (
const auto& pid_pair : pids_) {
478 for (
auto sample : pid_pair.second->media_sample_queue_) {
479 RCHECK(new_media_sample_cb_(pid_pair.first, sample));
481 pid_pair.second->media_sample_queue_.clear();
483 for (
auto sample : pid_pair.second->text_sample_queue_) {
484 RCHECK(new_text_sample_cb_(pid_pair.first, sample));
486 pid_pair.second->text_sample_queue_.clear();
All the methods that are virtual are virtual for mocking.