Shaka Player Embedded
demuxer_thread.cc
Go to the documentation of this file.
1 // Copyright 2017 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <cmath>
18 #include <memory>
19 #include <string>
20 #include <unordered_map>
21 #include <utility>
22 #include <vector>
23 
25 #include "src/media/media_utils.h"
26 
27 namespace shaka {
28 namespace media {
29 
30 namespace {
31 
32 std::string ShortContainerName(const std::string& mime) {
33  std::string subtype;
34  if (!ParseMimeType(mime, nullptr, &subtype, nullptr))
35  return "";
36 
37  DCHECK_LT(subtype.size(), 8u) << "Container needs a short name";
38  return subtype.substr(0, 8);
39 }
40 
41 } // namespace
42 
43 DemuxerThread::DemuxerThread(const std::string& mime, Demuxer::Client* client,
44  ElementaryStream* stream)
45  : mutex_("DemuxerThread"),
46  new_data_("New demuxed data"),
47  client_(client),
48  mime_(mime),
49  shutdown_(false),
50  data_(nullptr),
51  data_size_(0),
52  timestamp_offset_(0),
53  window_start_(0),
54  window_end_(HUGE_VAL),
55  need_key_frame_(true),
56  stream_(stream),
57  thread_(ShortContainerName(mime) + " demuxer",
58  std::bind(&DemuxerThread::ThreadMain, this)) {}
59 
61  if (thread_.joinable())
62  Stop();
63 }
64 
66  shutdown_ = true;
67  new_data_.SignalAllIfNotSet();
68  thread_.join();
69 }
70 
71 void DemuxerThread::AppendData(double timestamp_offset, double window_start,
72  double window_end, const uint8_t* data,
73  size_t data_size,
74  std::function<void(bool)> on_complete) {
75  DCHECK(data);
76  DCHECK_GT(data_size, 0u);
77 
78  std::unique_lock<Mutex> lock(mutex_);
79  data_ = data;
80  data_size_ = data_size;
81  timestamp_offset_ = timestamp_offset;
82  window_start_ = window_start;
83  window_end_ = window_end;
84  on_complete_ = std::move(on_complete);
85 
86  new_data_.SignalAll();
87 }
88 
89 void DemuxerThread::ThreadMain() {
90  using namespace std::placeholders; // NOLINT
91  auto* factory = DemuxerFactory::GetFactory();
92  if (factory)
93  demuxer_ = factory->Create(mime_, client_);
94  if (!demuxer_) {
95  // If we get an error before we append the first segment, then we won't have
96  // a callback yet, so we have nowhere to send the error to. Wait until we
97  // get the first segment.
98  std::unique_lock<Mutex> lock(mutex_);
99  if (!on_complete_ && !shutdown_) {
100  new_data_.ResetAndWaitWhileUnlocked(lock);
101  }
102  CallOnComplete(false);
103  return;
104  }
105 
106  while (!shutdown_) {
107  std::unique_lock<Mutex> lock(mutex_);
108  std::vector<std::shared_ptr<EncodedFrame>> frames;
109  if (!demuxer_->Demux(timestamp_offset_, data_, data_size_, &frames)) {
110  CallOnComplete(false);
111  break;
112  }
113 
114  for (auto& frame : frames) {
115  if (frame->pts < window_start_ ||
116  frame->pts + frame->duration > window_end_) {
117  need_key_frame_ = true;
118  VLOG(2) << "Dropping frame outside append window, pts=" << frame->pts;
119  continue;
120  }
121  if (need_key_frame_) {
122  if (frame->is_key_frame) {
123  need_key_frame_ = false;
124  } else {
125  VLOG(2) << "Dropping frame while looking for key frame, pts="
126  << frame->pts;
127  continue;
128  }
129  }
130  stream_->AddFrame(frame);
131  }
132  CallOnComplete(true);
133  new_data_.ResetAndWaitWhileUnlocked(lock);
134  }
135 }
136 
137 void DemuxerThread::CallOnComplete(bool success) {
138  if (on_complete_) {
139  // on_complete must be invoked on the event thread.
141  TaskPriority::Internal, "Append done",
142  std::bind(on_complete_, success));
143  on_complete_ = std::function<void(bool)>();
144  }
145 }
146 
147 } // namespace media
148 } // namespace shaka
bool ParseMimeType(const std::string &source, std::string *type, std::string *subtype, std::unordered_map< std::string, std::string > *params)
Definition: media_utils.cc:62
DemuxerThread(const std::string &mime, Demuxer::Client *client, ElementaryStream *stream)
std::shared_ptr< shaka::media::DecodedFrame > frame
void join()
Definition: thread.h:56
static const DemuxerFactory * GetFactory()
Definition: demuxer.cc:53
void AppendData(double timestamp_offset, double window_start, double window_end, const uint8_t *data, size_t data_size, std::function< void(bool)> on_complete)
std::shared_ptr< ThreadEvent< impl::RetOf< Func > > > AddInternalTask(TaskPriority priority, const std::string &name, Func &&callback)
Definition: task_runner.h:219
std::list< std::shared_ptr< BaseFrame > > frames
Definition: streams.cc:128
bool joinable() const
Definition: thread.h:37
TaskRunner * MainThread()