Shaka Packager SDK
Loading...
Searching...
No Matches
text_chunker.cc
1// Copyright 2017 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/media/chunking/text_chunker.h>
8
9#include <absl/log/check.h>
10
11#include <packager/macros/status.h>
12
13namespace shaka {
14namespace media {
15namespace {
16const size_t kStreamIndex = 0;
17} // namespace
18
19TextChunker::TextChunker(double segment_duration_in_seconds,
20 int64_t start_segment_number)
21 : segment_duration_in_seconds_(segment_duration_in_seconds),
22 segment_number_(start_segment_number),
23 ts_ttx_heartbeat_shift_(kDefaultTtxHeartbeatShift),
24 use_segment_coordinator_(false) {};
25
26TextChunker::TextChunker(double segment_duration_in_seconds,
27 int64_t start_segment_number,
28 int64_t ts_ttx_heartbeat_shift)
29 : segment_duration_in_seconds_(segment_duration_in_seconds),
30 segment_number_(start_segment_number),
31 ts_ttx_heartbeat_shift_(ts_ttx_heartbeat_shift),
32 use_segment_coordinator_(false) {};
33
34TextChunker::TextChunker(double segment_duration_in_seconds,
35 int64_t start_segment_number,
36 int64_t ts_ttx_heartbeat_shift,
37 bool use_segment_coordinator)
38 : segment_duration_in_seconds_(segment_duration_in_seconds),
39 segment_number_(start_segment_number),
40 ts_ttx_heartbeat_shift_(ts_ttx_heartbeat_shift),
41 use_segment_coordinator_(use_segment_coordinator) {};
42
43Status TextChunker::Process(std::unique_ptr<StreamData> data) {
44 switch (data->stream_data_type) {
45 case StreamDataType::kStreamInfo:
46 return OnStreamInfo(std::move(data->stream_info));
47 case StreamDataType::kTextSample:
48 return OnTextSample(data->text_sample);
49 case StreamDataType::kCueEvent:
50 return OnCueEvent(data->cue_event);
51 case StreamDataType::kSegmentInfo:
52 if (use_segment_coordinator_) {
53 return OnSegmentInfo(std::move(data->segment_info));
54 } else {
55 // Pass through for non-teletext streams
56 return DispatchSegmentInfo(kStreamIndex, std::move(data->segment_info));
57 }
58 default:
59 return Status(error::INTERNAL_ERROR,
60 "Invalid stream data type for this handler");
61 }
62}
63
64Status TextChunker::OnFlushRequest(size_t /*input_stream_index*/) {
65 // Keep outputting segments until all the samples leave the system. Calling
66 // |DispatchSegment| will remove samples over time.
67 //
68 // In coordinator mode, the final SegmentInfo from video/audio should have
69 // already triggered the last segment dispatch with the correct duration.
70 // This loop handles any remaining samples (edge cases or non-coordinator
71 // mode).
72 while (samples_in_current_segment_.size()) {
73 if (segment_start_ < 0) {
74 // No segments were ever started - nothing to flush
75 break;
76 }
77 int64_t segment_end = segment_start_ + segment_duration_;
78 AddOngoingCuesToCurrentSegment(segment_end);
79 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
80 }
81
82 return FlushAllDownstreams();
83}
84
85Status TextChunker::OnStreamInfo(std::shared_ptr<const StreamInfo> info) {
86 time_scale_ = info->time_scale();
87 segment_duration_ = ScaleTime(segment_duration_in_seconds_);
88
89 return DispatchStreamInfo(kStreamIndex, std::move(info));
90}
91
92Status TextChunker::OnCueEvent(std::shared_ptr<const CueEvent> event) {
93 // We are going to end the current segment prematurely using the cue event's
94 // time as the new segment end.
95
96 // Because the cue should have been inserted into the stream such that no
97 // later sample could start before it does, we know that there should
98 // be no later samples starting before the cue event.
99
100 // Convert the event's time to be scaled to the time of each sample.
101 const int64_t event_time = ScaleTime(event->time_in_seconds);
102 // Output all full segments before the segment that the cue event interrupts.
103 while (segment_start_ + segment_duration_ < event_time) {
104 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
105 }
106
107 const int64_t shorten_duration = event_time - segment_start_;
108 RETURN_IF_ERROR(DispatchSegment(shorten_duration));
109 return DispatchCueEvent(kStreamIndex, std::move(event));
110}
111
112Status TextChunker::OnTextSample(std::shared_ptr<const TextSample> sample) {
113 // Output all segments that come before our new sample start_time.
114 // However, if role is MediaHeartBeat, remove 2s to avoid premature segment
115 // generation.
116
117 int64_t sample_start = sample->start_time();
118 const auto role = sample->role();
119 DVLOG(2) << "OnTextSample: role=" << static_cast<int>(role)
120 << " pts=" << sample_start << " end=" << sample->EndTime()
121 << " is_empty=" << sample->is_empty()
122 << " sub_stream_index=" << sample->sub_stream_index();
123
124 // If we have not seen a sample yet, base all segments off the first sample's
125 // start time. In coordinator mode, we wait for SegmentInfo to initialize
126 // segment_start_ so that we align with video/audio boundaries.
127 if (segment_start_ < 0 && !use_segment_coordinator_) {
128 // Force the first segment to start at the segment that would have started
129 // before the sample. This should allow segments from different streams to
130 // align.
131 segment_start_ = (sample_start / segment_duration_) * segment_duration_;
132 DVLOG(1) << "first segment start=" << segment_start_;
133 }
134
135 switch (role) {
136 case TextSampleRole::kCue: {
137 DVLOG(2) << "PTS=" << sample_start << " cue with end "
138 << sample->EndTime();
139 break;
140 }
141 case TextSampleRole::kCueStart: {
142 DVLOG(2) << "PTS=" << sample_start << " cue start wo end";
143 break;
144 }
145 case TextSampleRole::kCueEnd: {
146 DVLOG(2) << "PTS=" << sample_start << " cue end";
147 // Convert any cues without end to full cues (but only once)
148 auto end_time = sample->EndTime();
149 for (auto s : samples_without_end_) {
150 int64_t cue_start = s->start_time();
151 if (cue_start < segment_start_) {
152 cue_start = segment_start_;
153 }
154 auto nS =
155 std::make_shared<TextSample>("", cue_start, end_time, s->settings(),
156 s->body(), TextSampleRole::kCue);
157 DVLOG(3) << "cue shortened. startTime=" << s->start_time()
158 << " endTime=" << end_time;
159 samples_in_current_segment_.push_back(nS);
160 }
161 samples_without_end_.clear();
162 break;
163 }
164 case TextSampleRole::kTextHeartBeat: {
165 break;
166 }
167 case TextSampleRole::kMediaHeartBeat: {
168 sample_start -= ts_ttx_heartbeat_shift_;
169 latest_media_heartbeat_time_ = sample_start;
170 DVLOG(3) << "PTS=" << sample_start << " media heartbeat";
171 break;
172 }
173 default: {
174 // LOG(ERROR) << "Unknown role encountered. pts=" << sample_start;
175 }
176 }
177
178 if (role != TextSampleRole::kMediaHeartBeat) {
179 // Use SignedPtsDiff for wrap-safe comparison
180 if (PtsIsBefore(sample_start, latest_media_heartbeat_time_)) {
181 LOG(WARNING) << "Potentially bad text segment: text pts=" << sample_start
182 << " before latest media pts="
183 << latest_media_heartbeat_time_;
184 }
185 }
186
187 // To avoid waiting for live teletext cues to get an end time/duration
188 // they are triggered with a long fixed duration.
189 // Here we should detect such cues and put them in a special list.
190 // Once an end cue event with duration comes, we should change the duration
191 // to the correct value. If an end of a segment duration is triggered
192 // before that, we should split the segment so that the first copy ends
193 // at the segment boundary, and the second copy starts at the segment
194 // boundary. We could keep the long duration of the second part and
195 // use the long duration as an indication that it is a cue which has
196 // not yet received its proper end time.
197
198 // We need to write all the segments that would have ended before the new
199 // sample started. For segment without end, we check if they have started
200 // and if so, make cropped copy that goes to the end.
201 // We also crop such a cue at the start if needed.
202 //
203 // In coordinator mode, we skip this entirely - segment dispatch is driven
204 // by OnSegmentInfo which receives actual video/audio segment boundaries.
205 // This ensures text segments align perfectly with video/audio segments.
206 if (!use_segment_coordinator_ && segment_start_ >= 0) {
207 int64_t segment_end = segment_start_ + segment_duration_;
208 while (!PtsIsBefore(sample_start, segment_end)) {
209 // Add cropped copies of ongoing cues before dispatching
210 AddOngoingCuesToCurrentSegment(segment_end);
211 // |DispatchSegment| will advance |segment_start_|.
212 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
213 segment_end = segment_start_ + segment_duration_;
214 }
215 }
216
217 switch (role) {
218 case TextSampleRole::kCue: {
219 samples_in_current_segment_.push_back(std::move(sample));
220 break;
221 }
222 case TextSampleRole::kCueStart: {
223 samples_without_end_.push_back(std::move(sample));
224 break;
225 }
226 default: {
227 // Do nothing
228 }
229 }
230
231 return Status::OK;
232}
233
234Status TextChunker::OnSegmentInfo(std::shared_ptr<const SegmentInfo> info) {
235 DCHECK(use_segment_coordinator_)
236 << "OnSegmentInfo should only be called when coordinator mode is enabled";
237
238 // Skip subsegments - only align on full segments
239 if (info->is_subsegment) {
240 DVLOG(3) << "TextChunker: Skipping subsegment SegmentInfo";
241 return Status::OK;
242 }
243
244 // Use start_timestamp + duration as the end boundary. This ensures we
245 // dispatch the segment that just completed, not wait for the next
246 // SegmentInfo. Without this, the final segment would never be dispatched
247 // since there's no subsequent SegmentInfo to trigger it.
248 int64_t segment_end_boundary = info->start_timestamp + info->duration;
249
250 DVLOG(2) << "TextChunker received SegmentInfo: start="
251 << info->start_timestamp << " duration=" << info->duration
252 << " end_boundary=" << segment_end_boundary
253 << " (current segment_start_=" << segment_start_ << ")";
254
255 // If this is the first segment info, initialize segment_start_
256 if (segment_start_ < 0) {
257 segment_start_ = info->start_timestamp;
258 DVLOG(2) << "TextChunker: Initialized segment_start_ from SegmentInfo: "
259 << segment_start_;
260 }
261
262 // Handle PTS wrap-around: if segment_end_boundary appears to be earlier than
263 // segment_start_ by more than half the 33-bit PTS range, it's likely
264 // wrapped around and is actually later.
265 // 33-bit PTS wraps at 2^33 = 8,589,934,592 ticks (~26.5 hours @ 90kHz)
266 // Half range = ~13 hours = 4,294,967,296 ticks
267 const int64_t kPtsWrapThreshold = 4294967296LL; // Half of 2^33
268
269 if (segment_end_boundary < segment_start_) {
270 int64_t diff = segment_start_ - segment_end_boundary;
271 if (diff > kPtsWrapThreshold) {
272 // This looks like a wrap-around - the boundary has wrapped but our
273 // segment_start_ hasn't yet. Treat this boundary as being later.
274 DVLOG(2) << "TextChunker: Detected PTS wrap-around. End boundary "
275 << segment_end_boundary
276 << " appears earlier than segment_start_ " << segment_start_
277 << " by " << diff << " ticks, but is likely "
278 << "later due to wrap-around.";
279
280 // Dispatch one final segment before the wrap and align to the boundary
281 int64_t segment_end = segment_start_ + segment_duration_;
282 AddOngoingCuesToCurrentSegment(segment_end);
283 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
284 segment_start_ = info->start_timestamp;
285 } else {
286 // End boundary is genuinely earlier - this shouldn't happen in normal
287 // flow but we'll log a warning and skip this boundary
288 LOG(WARNING) << "TextChunker: Received SegmentInfo end boundary "
289 << segment_end_boundary << " that is earlier than current "
290 << "segment_start_ " << segment_start_ << " (diff: " << diff
291 << "). Skipping this SegmentInfo.";
292 return Status::OK;
293 }
294 }
295
296 // Dispatch all pending segments up to the end boundary
297 while (segment_start_ < segment_end_boundary) {
298 int64_t segment_end = segment_start_ + segment_duration_;
299
300 // If the next calculated segment would go past the end boundary,
301 // dispatch a shorter segment to align with the actual boundary
302 if (segment_end > segment_end_boundary) {
303 int64_t adjusted_duration = segment_end_boundary - segment_start_;
304 if (adjusted_duration > 0) {
305 DVLOG(3) << "TextChunker: Dispatching adjusted segment to align with "
306 << "end boundary. Duration: " << adjusted_duration
307 << " (normal: " << segment_duration_ << ")";
308 // Add ongoing cues before dispatching (use end boundary as end)
309 AddOngoingCuesToCurrentSegment(segment_end_boundary);
310 RETURN_IF_ERROR(DispatchSegment(adjusted_duration));
311 }
312 break;
313 }
314
315 // Dispatch a full-duration segment
316 DVLOG(3) << "TextChunker: Dispatching full segment aligned to end boundary";
317 // Add ongoing cues before dispatching
318 AddOngoingCuesToCurrentSegment(segment_end);
319 RETURN_IF_ERROR(DispatchSegment(segment_duration_));
320 }
321
322 // Align next segment start to end boundary (start of next segment)
323 segment_start_ = segment_end_boundary;
324
325 return Status::OK;
326}
327
328Status TextChunker::DispatchSegment(int64_t duration) {
329 DCHECK_GT(duration, 0) << "Segment duration should always be positive";
330
331 int64_t segment_end = segment_start_ + duration;
332
333 // Output only the samples that actually belong in this segment.
334 // Use wrap-safe comparison since samples may have wrapped PTS.
335 DVLOG(1) << "DispatchSegment, start=" << segment_start_
336 << " end=" << segment_end;
337 for (const auto& sample : samples_in_current_segment_) {
338 // Only dispatch if sample starts before segment end
339 if (PtsIsBefore(sample->start_time(), segment_end)) {
340 DVLOG(2) << "DispatchTextSample, pts=" << sample->start_time()
341 << " end=" << sample->EndTime();
342 RETURN_IF_ERROR(DispatchTextSample(kStreamIndex, sample));
343 } else {
344 DVLOG(2) << "Skipping sample pts=" << sample->start_time()
345 << " (after segment_end=" << segment_end << ")";
346 }
347 }
348
349 // Output the segment info.
350 std::shared_ptr<SegmentInfo> info = std::make_shared<SegmentInfo>();
351 info->start_timestamp = segment_start_;
352 info->duration = duration;
353 info->segment_number = segment_number_++;
354
355 RETURN_IF_ERROR(DispatchSegmentInfo(kStreamIndex, std::move(info)));
356
357 // Move onto the next segment.
358 const int64_t new_segment_start = segment_start_ + duration;
359 segment_start_ = new_segment_start;
360
361 // Remove all samples that end before the (new) current segment started.
362 // Use wrap-safe comparison.
363 samples_in_current_segment_.remove_if(
364 [new_segment_start](const std::shared_ptr<const TextSample>& sample) {
365 // Remove if sample ends before or at new segment start (wrap-safe)
366 return PtsIsBeforeOrEqual(sample->EndTime(), new_segment_start);
367 });
368
369 return Status::OK;
370}
371
372int64_t TextChunker::ScaleTime(double seconds) const {
373 DCHECK_GT(time_scale_, 0) << "Need positive time scale to scale time.";
374 return static_cast<int64_t>(seconds * time_scale_);
375}
376
377void TextChunker::AddOngoingCuesToCurrentSegment(int64_t segment_end) {
378 // For each ongoing cue (started but no end time yet), create a cropped
379 // copy that ends at the segment boundary and add to current segment.
380 for (const auto& s : samples_without_end_) {
381 if (s->role() == TextSampleRole::kCueStart) {
382 // Only include if the cue started before this segment ends
383 if (PtsIsBefore(s->start_time(), segment_end)) {
384 // Crop the start time to segment_start_ if needed
385 auto cue_start = s->start_time();
386 if (PtsIsBefore(cue_start, segment_start_)) {
387 cue_start = segment_start_;
388 }
389 auto cropped_cue = std::make_shared<TextSample>(
390 "", cue_start, segment_end, s->settings(), s->body());
391 DVLOG(3) << "AddOngoingCuesToCurrentSegment: cropped cue start="
392 << cue_start << " end=" << segment_end;
393 samples_in_current_segment_.push_back(std::move(cropped_cue));
394 }
395 }
396 }
397}
398} // namespace media
399} // namespace shaka
All the methods that are virtual are virtual for mocking.