7#include <packager/media/chunking/text_chunker.h>
9#include <absl/log/check.h>
11#include <packager/macros/status.h>
16const size_t kStreamIndex = 0;
19TextChunker::TextChunker(
double segment_duration_in_seconds,
20 int64_t start_segment_number)
21 : segment_duration_in_seconds_(segment_duration_in_seconds),
22 segment_number_(start_segment_number),
23 ts_ttx_heartbeat_shift_(kDefaultTtxHeartbeatShift),
24 use_segment_coordinator_(false) {};
26TextChunker::TextChunker(
double segment_duration_in_seconds,
27 int64_t start_segment_number,
28 int64_t ts_ttx_heartbeat_shift)
29 : segment_duration_in_seconds_(segment_duration_in_seconds),
30 segment_number_(start_segment_number),
31 ts_ttx_heartbeat_shift_(ts_ttx_heartbeat_shift),
32 use_segment_coordinator_(false) {};
34TextChunker::TextChunker(
double segment_duration_in_seconds,
35 int64_t start_segment_number,
36 int64_t ts_ttx_heartbeat_shift,
37 bool use_segment_coordinator)
38 : segment_duration_in_seconds_(segment_duration_in_seconds),
39 segment_number_(start_segment_number),
40 ts_ttx_heartbeat_shift_(ts_ttx_heartbeat_shift),
41 use_segment_coordinator_(use_segment_coordinator) {};
43Status TextChunker::Process(std::unique_ptr<StreamData> data) {
44 switch (data->stream_data_type) {
45 case StreamDataType::kStreamInfo:
46 return OnStreamInfo(std::move(data->stream_info));
47 case StreamDataType::kTextSample:
48 return OnTextSample(data->text_sample);
49 case StreamDataType::kCueEvent:
50 return OnCueEvent(data->cue_event);
51 case StreamDataType::kSegmentInfo:
52 if (use_segment_coordinator_) {
53 return OnSegmentInfo(std::move(data->segment_info));
56 return DispatchSegmentInfo(kStreamIndex, std::move(data->segment_info));
59 return Status(error::INTERNAL_ERROR,
60 "Invalid stream data type for this handler");
64Status TextChunker::OnFlushRequest(
size_t ) {
72 while (samples_in_current_segment_.size()) {
73 if (segment_start_ < 0) {
77 int64_t segment_end = segment_start_ + segment_duration_;
78 AddOngoingCuesToCurrentSegment(segment_end);
79 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
82 return FlushAllDownstreams();
85Status TextChunker::OnStreamInfo(std::shared_ptr<const StreamInfo> info) {
86 time_scale_ = info->time_scale();
87 segment_duration_ = ScaleTime(segment_duration_in_seconds_);
89 return DispatchStreamInfo(kStreamIndex, std::move(info));
92Status TextChunker::OnCueEvent(std::shared_ptr<const CueEvent> event) {
101 const int64_t event_time = ScaleTime(event->time_in_seconds);
103 while (segment_start_ + segment_duration_ < event_time) {
104 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
107 const int64_t shorten_duration = event_time - segment_start_;
108 RETURN_IF_ERROR(DispatchSegment(shorten_duration));
109 return DispatchCueEvent(kStreamIndex, std::move(event));
112Status TextChunker::OnTextSample(std::shared_ptr<const TextSample> sample) {
117 int64_t sample_start = sample->start_time();
118 const auto role = sample->role();
119 DVLOG(2) <<
"OnTextSample: role=" <<
static_cast<int>(role)
120 <<
" pts=" << sample_start <<
" end=" << sample->EndTime()
121 <<
" is_empty=" << sample->is_empty()
122 <<
" sub_stream_index=" << sample->sub_stream_index();
127 if (segment_start_ < 0 && !use_segment_coordinator_) {
131 segment_start_ = (sample_start / segment_duration_) * segment_duration_;
132 DVLOG(1) <<
"first segment start=" << segment_start_;
136 case TextSampleRole::kCue: {
137 DVLOG(2) <<
"PTS=" << sample_start <<
" cue with end "
138 << sample->EndTime();
141 case TextSampleRole::kCueStart: {
142 DVLOG(2) <<
"PTS=" << sample_start <<
" cue start wo end";
145 case TextSampleRole::kCueEnd: {
146 DVLOG(2) <<
"PTS=" << sample_start <<
" cue end";
148 auto end_time = sample->EndTime();
149 for (
auto s : samples_without_end_) {
150 int64_t cue_start = s->start_time();
151 if (cue_start < segment_start_) {
152 cue_start = segment_start_;
155 std::make_shared<TextSample>(
"", cue_start, end_time, s->settings(),
156 s->body(), TextSampleRole::kCue);
157 DVLOG(3) <<
"cue shortened. startTime=" << s->start_time()
158 <<
" endTime=" << end_time;
159 samples_in_current_segment_.push_back(nS);
161 samples_without_end_.clear();
164 case TextSampleRole::kTextHeartBeat: {
167 case TextSampleRole::kMediaHeartBeat: {
168 sample_start -= ts_ttx_heartbeat_shift_;
169 latest_media_heartbeat_time_ = sample_start;
170 DVLOG(3) <<
"PTS=" << sample_start <<
" media heartbeat";
178 if (role != TextSampleRole::kMediaHeartBeat) {
180 if (PtsIsBefore(sample_start, latest_media_heartbeat_time_)) {
181 LOG(WARNING) <<
"Potentially bad text segment: text pts=" << sample_start
182 <<
" before latest media pts="
183 << latest_media_heartbeat_time_;
206 if (!use_segment_coordinator_ && segment_start_ >= 0) {
207 int64_t segment_end = segment_start_ + segment_duration_;
208 while (!PtsIsBefore(sample_start, segment_end)) {
210 AddOngoingCuesToCurrentSegment(segment_end);
212 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
213 segment_end = segment_start_ + segment_duration_;
218 case TextSampleRole::kCue: {
219 samples_in_current_segment_.push_back(std::move(sample));
222 case TextSampleRole::kCueStart: {
223 samples_without_end_.push_back(std::move(sample));
234Status TextChunker::OnSegmentInfo(std::shared_ptr<const SegmentInfo> info) {
235 DCHECK(use_segment_coordinator_)
236 <<
"OnSegmentInfo should only be called when coordinator mode is enabled";
239 if (info->is_subsegment) {
240 DVLOG(3) <<
"TextChunker: Skipping subsegment SegmentInfo";
248 int64_t segment_end_boundary = info->start_timestamp + info->duration;
250 DVLOG(2) <<
"TextChunker received SegmentInfo: start="
251 << info->start_timestamp <<
" duration=" << info->duration
252 <<
" end_boundary=" << segment_end_boundary
253 <<
" (current segment_start_=" << segment_start_ <<
")";
256 if (segment_start_ < 0) {
257 segment_start_ = info->start_timestamp;
258 DVLOG(2) <<
"TextChunker: Initialized segment_start_ from SegmentInfo: "
267 const int64_t kPtsWrapThreshold = 4294967296LL;
269 if (segment_end_boundary < segment_start_) {
270 int64_t diff = segment_start_ - segment_end_boundary;
271 if (diff > kPtsWrapThreshold) {
274 DVLOG(2) <<
"TextChunker: Detected PTS wrap-around. End boundary "
275 << segment_end_boundary
276 <<
" appears earlier than segment_start_ " << segment_start_
277 <<
" by " << diff <<
" ticks, but is likely "
278 <<
"later due to wrap-around.";
281 int64_t segment_end = segment_start_ + segment_duration_;
282 AddOngoingCuesToCurrentSegment(segment_end);
283 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
284 segment_start_ = info->start_timestamp;
288 LOG(WARNING) <<
"TextChunker: Received SegmentInfo end boundary "
289 << segment_end_boundary <<
" that is earlier than current "
290 <<
"segment_start_ " << segment_start_ <<
" (diff: " << diff
291 <<
"). Skipping this SegmentInfo.";
297 while (segment_start_ < segment_end_boundary) {
298 int64_t segment_end = segment_start_ + segment_duration_;
302 if (segment_end > segment_end_boundary) {
303 int64_t adjusted_duration = segment_end_boundary - segment_start_;
304 if (adjusted_duration > 0) {
305 DVLOG(3) <<
"TextChunker: Dispatching adjusted segment to align with "
306 <<
"end boundary. Duration: " << adjusted_duration
307 <<
" (normal: " << segment_duration_ <<
")";
309 AddOngoingCuesToCurrentSegment(segment_end_boundary);
310 RETURN_IF_ERROR(DispatchSegment(adjusted_duration));
316 DVLOG(3) <<
"TextChunker: Dispatching full segment aligned to end boundary";
318 AddOngoingCuesToCurrentSegment(segment_end);
319 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
323 segment_start_ = segment_end_boundary;
328Status TextChunker::DispatchSegment(int64_t duration) {
329 DCHECK_GT(duration, 0) <<
"Segment duration should always be positive";
331 int64_t segment_end = segment_start_ + duration;
335 DVLOG(1) <<
"DispatchSegment, start=" << segment_start_
336 <<
" end=" << segment_end;
337 for (
const auto& sample : samples_in_current_segment_) {
339 if (PtsIsBefore(sample->start_time(), segment_end)) {
340 DVLOG(2) <<
"DispatchTextSample, pts=" << sample->start_time()
341 <<
" end=" << sample->EndTime();
342 RETURN_IF_ERROR(DispatchTextSample(kStreamIndex, sample));
344 DVLOG(2) <<
"Skipping sample pts=" << sample->start_time()
345 <<
" (after segment_end=" << segment_end <<
")";
350 std::shared_ptr<SegmentInfo> info = std::make_shared<SegmentInfo>();
351 info->start_timestamp = segment_start_;
352 info->duration = duration;
353 info->segment_number = segment_number_++;
355 RETURN_IF_ERROR(DispatchSegmentInfo(kStreamIndex, std::move(info)));
358 const int64_t new_segment_start = segment_start_ + duration;
359 segment_start_ = new_segment_start;
363 samples_in_current_segment_.remove_if(
364 [new_segment_start](
const std::shared_ptr<const TextSample>& sample) {
366 return PtsIsBeforeOrEqual(sample->EndTime(), new_segment_start);
372int64_t TextChunker::ScaleTime(
double seconds)
const {
373 DCHECK_GT(time_scale_, 0) <<
"Need positive time scale to scale time.";
374 return static_cast<int64_t
>(seconds * time_scale_);
377void TextChunker::AddOngoingCuesToCurrentSegment(int64_t segment_end) {
380 for (
const auto& s : samples_without_end_) {
381 if (s->role() == TextSampleRole::kCueStart) {
383 if (PtsIsBefore(s->start_time(), segment_end)) {
385 auto cue_start = s->start_time();
386 if (PtsIsBefore(cue_start, segment_start_)) {
387 cue_start = segment_start_;
389 auto cropped_cue = std::make_shared<TextSample>(
390 "", cue_start, segment_end, s->settings(), s->body());
391 DVLOG(3) <<
"AddOngoingCuesToCurrentSegment: cropped cue start="
392 << cue_start <<
" end=" << segment_end;
393 samples_in_current_segment_.push_back(std::move(cropped_cue));
All the methods that are virtual are virtual for mocking.