Shaka Player Embedded
decoder_thread.cc
Go to the documentation of this file.
1 // Copyright 2017 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <glog/logging.h>
18 
19 #include <algorithm>
20 #include <cmath>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 
25 #include "src/util/clock.h"
26 #include "src/util/utils.h"
27 
28 namespace shaka {
29 namespace media {
30 
31 namespace {
32 
34 constexpr const double kDecodeBufferSize = 1;
35 
37 constexpr const double kEndDelta = 0.1;
38 
39 double DecodedAheadOf(StreamBase* stream, double time) {
40  for (auto& range : stream->GetBufferedRanges()) {
41  if (range.end > time) {
42  if (range.start < time + StreamBase::kMaxGapSize) {
43  return range.end - std::max(time, range.start);
44  }
45 
46  // The ranges are sorted, so avoid looking at the remaining elements.
47  break;
48  }
49  }
50  return 0;
51 }
52 
53 } // namespace
54 
56  : mutex_("DecoderThread"),
57  signal_("DecoderChanged"),
58  client_(client),
59  input_(nullptr),
60  output_(output),
61  decoder_(nullptr),
62  cdm_(nullptr),
63  last_frame_time_(NAN),
64  shutdown_(false),
65  did_flush_(false),
66  raised_waiting_event_(false),
67  thread_("Decoder", std::bind(&DecoderThread::ThreadMain, this)) {}
68 
70  {
71  std::unique_lock<Mutex> lock(mutex_);
72  shutdown_ = true;
73  signal_.SignalAllIfNotSet();
74  }
75  thread_.join();
76 }
77 
79  std::unique_lock<Mutex> lock(mutex_);
80  input_ = input;
81  if (input && decoder_)
82  signal_.SignalAllIfNotSet();
83 }
84 
86  std::unique_lock<Mutex> lock(mutex_);
87  input_ = nullptr;
88 }
89 
91  std::unique_lock<Mutex> lock(mutex_);
92  last_frame_time_ = NAN;
93  did_flush_ = false;
94  // Remove all the existing frames. We'll decode them again anyway and this
95  // ensures we don't keep future frames forever when seeking backwards.
96  output_->Remove(0, INFINITY);
97 }
98 
100  std::unique_lock<Mutex> lock(mutex_);
101  cdm_ = cdm;
102 }
103 
105  std::unique_lock<Mutex> lock(mutex_);
106  decoder_ = decoder;
107  if (decoder && input_)
108  signal_.SignalAllIfNotSet();
109 }
110 
111 void DecoderThread::ThreadMain() {
112  std::unique_lock<Mutex> lock(mutex_);
113  while (!shutdown_) {
114  if (!input_ || !decoder_) {
115  if (input_)
116  LOG(DFATAL) << "No decoder provided and no default decoder exists";
117 
118  signal_.ResetAndWaitWhileUnlocked(lock);
119  continue;
120  }
121 
122  const double cur_time = client_->CurrentTime();
123  double last_time = last_frame_time_;
124 
125  if (DecodedAheadOf(output_, cur_time) > kDecodeBufferSize) {
126  util::Unlocker<Mutex> unlock(&lock);
128  continue;
129  }
130 
131  // Evict frames that are not near the current time. This ensures we don't
132  // keep frames buffered forever.
133  output_->Remove(0, cur_time - kDecodeBufferSize);
134 
135  std::shared_ptr<EncodedFrame> frame;
136  if (std::isnan(last_time)) {
137  decoder_->ResetDecoder();
138  frame = input_->GetFrame(cur_time, FrameLocation::KeyFrameBefore);
139  } else {
140  frame = input_->GetFrame(last_time, FrameLocation::After);
141  }
142 
143  if (!frame) {
144  if (!std::isnan(last_time) &&
145  last_time + kEndDelta >= client_->Duration() && !did_flush_) {
146  // If this is the last frame, pass the null to DecodeFrame, which will
147  // flush the decoder.
148  did_flush_ = true;
149  } else {
150  util::Unlocker<Mutex> unlock(&lock);
152  continue;
153  }
154  }
155 
156  std::string error;
157  std::vector<std::shared_ptr<DecodedFrame>> decoded;
158  const MediaStatus decode_status =
159  decoder_->Decode(frame, cdm_, &decoded, &error);
160  if (decode_status == MediaStatus::KeyNotFound) {
161  // If we don't have the required key, signal the <video> and wait.
162  if (!raised_waiting_event_) {
163  raised_waiting_event_ = true;
164  client_->OnWaitingForKey();
165  }
166  // TODO: Consider adding a signal for new keys so we can avoid polling and
167  // just wait on a condition variable.
168  util::Unlocker<Mutex> unlock(&lock);
170  continue;
171  }
172  if (decode_status != MediaStatus::Success) {
173  client_->OnError(error);
174  break;
175  }
176 
177  raised_waiting_event_ = false;
178  for (auto& decoded_frame : decoded) {
179  output_->AddFrame(decoded_frame);
180  }
181 
182  if (frame)
183  last_frame_time_ = frame->dts;
184  }
185 }
186 
187 } // namespace media
188 } // namespace shaka
static constexpr const double kMaxGapSize
Definition: streams.h:76
std::shared_ptr< shaka::media::DecodedFrame > frame
void SetDecoder(Decoder *decoder)
void join()
Definition: thread.h:56
virtual double CurrentTime() const =0
virtual void OnError(const std::string &error)=0
virtual MediaStatus Decode(std::shared_ptr< EncodedFrame > input, const eme::Implementation *eme, std::vector< std::shared_ptr< DecodedFrame >> *frames, std::string *extra_info)=0
T ResetAndWaitWhileUnlocked(std::unique_lock< _Mutex > &lock)
Definition: thread_event.h:106
void Attach(const ElementaryStream *input)
DecoderThread(Client *client, DecodedStream *output)
static const Clock Instance
Definition: clock.h:29
virtual void SleepSeconds(double seconds) const
Definition: clock.cc:39
void SetCdm(eme::Implementation *cdm)
void AddFrame(std::shared_ptr< T > frame)
Definition: streams.h:187
virtual void ResetDecoder()=0
void Remove(double start, double end)
Definition: streams.cc:270
std::shared_ptr< T > GetFrame(double time, FrameLocation kind=FrameLocation::After) const
Definition: streams.h:192
virtual double Duration() const =0