Shaka Packager SDK
producer_consumer_queue.h
1 // Copyright 2014 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8 #define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
9 
10 #include <chrono>
11 #include <deque>
12 
13 #include <absl/log/check.h>
14 #include <absl/log/log.h>
15 #include <absl/strings/str_format.h>
16 #include <absl/synchronization/mutex.h>
17 #include <absl/time/time.h>
18 
19 #include <packager/macros/classes.h>
20 #include <packager/status.h>
21 
22 namespace shaka {
23 namespace media {
24 
25 static const size_t kUnlimitedCapacity = 0u;
26 static const int64_t kInfiniteTimeout = -1;
27 
31 template <class T>
33  public:
37  explicit ProducerConsumerQueue(size_t capacity);
38 
43  ProducerConsumerQueue(size_t capacity, size_t starting_pos);
44 
46 
55  Status Push(const T& element, int64_t timeout_ms);
56 
64  Status Pop(T* element, int64_t timeout_ms);
65 
78  Status Peek(size_t pos, T* element, int64_t timeout_ms);
79 
83  void Stop() {
84  absl::MutexLock lock(&mutex_);
85  stop_requested_ = true;
86  not_empty_cv_.SignalAll();
87  not_full_cv_.SignalAll();
88  new_element_cv_.SignalAll();
89  }
90 
92  bool Empty() const {
93  absl::MutexLock lock(&mutex_);
94  return q_.empty();
95  }
96 
98  size_t Size() const {
99  absl::MutexLock lock(&mutex_);
100  return q_.size();
101  }
102 
105  size_t HeadPos() const {
106  absl::MutexLock lock(&mutex_);
107  return head_pos_;
108  }
109 
112  size_t TailPos() const {
113  absl::MutexLock lock(&mutex_);
114  return head_pos_ + q_.size() - 1;
115  }
116 
119  bool Stopped() const {
120  absl::MutexLock lock(&mutex_);
121  return stop_requested_;
122  }
123 
124  private:
125  // Move head_pos_ to center on pos.
126  void SlideHeadOnCenter(size_t pos);
127 
128  const size_t capacity_; // Maximum number of elements; zero means unlimited.
129 
130  mutable absl::Mutex mutex_;
131  size_t head_pos_ ABSL_GUARDED_BY(mutex_); // Head position.
132  std::deque<T> q_
133  ABSL_GUARDED_BY(mutex_); // Internal queue holding the elements.
134  absl::CondVar not_empty_cv_ ABSL_GUARDED_BY(mutex_);
135  absl::CondVar not_full_cv_ ABSL_GUARDED_BY(mutex_);
136  absl::CondVar new_element_cv_ ABSL_GUARDED_BY(mutex_);
137  bool stop_requested_
138  ABSL_GUARDED_BY(mutex_); // True after Stop has been called.
139 
140  DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
141 };
142 
143 // Implementations of non-inline functions.
144 template <class T>
146  : capacity_(capacity),
147  head_pos_(0),
148  stop_requested_(false) {}
149 
150 template <class T>
152  size_t starting_pos)
153  : capacity_(capacity),
154  head_pos_(starting_pos),
155  stop_requested_(false) {
156 }
157 
158 template <class T>
160 
161 template <class T>
162 Status ProducerConsumerQueue<T>::Push(const T& element, int64_t timeout_ms) {
163  absl::MutexLock lock(&mutex_);
164  bool woken = false;
165 
166  // Check for queue shutdown.
167  if (stop_requested_)
168  return Status(error::STOPPED, "");
169 
170  auto start = std::chrono::steady_clock::now();
171  auto timeout_delta = std::chrono::milliseconds(timeout_ms);
172 
173  if (capacity_) {
174  while (q_.size() == capacity_) {
175  if (timeout_ms < 0) {
176  // Wait forever, or until Stop.
177  not_full_cv_.Wait(&mutex_);
178  } else {
179  auto elapsed = std::chrono::steady_clock::now() - start;
180  if (elapsed < timeout_delta) {
181  // Wait with timeout, or until Stop.
182  not_full_cv_.WaitWithTimeout(
183  &mutex_, absl::FromChrono(timeout_delta - elapsed));
184  } else {
185  // We're through waiting.
186  return Status(error::TIME_OUT, "Time out on pushing.");
187  }
188  }
189  // Re-check for queue shutdown after waking from Wait.
190  if (stop_requested_)
191  return Status(error::STOPPED, "");
192 
193  woken = true;
194  }
195  DCHECK_LT(q_.size(), capacity_);
196  }
197 
198  // Signal consumer to proceed if we are going to create some elements.
199  if (q_.empty())
200  not_empty_cv_.Signal();
201  new_element_cv_.Signal();
202 
203  q_.push_back(element);
204 
205  // Signal other producers if we just acquired more capacity.
206  if (woken && q_.size() != capacity_)
207  not_full_cv_.Signal();
208  return Status::OK;
209 }
210 
211 template <class T>
212 Status ProducerConsumerQueue<T>::Pop(T* element, int64_t timeout_ms) {
213  absl::MutexLock lock(&mutex_);
214  bool woken = false;
215 
216  auto start = std::chrono::steady_clock::now();
217  auto timeout_delta = std::chrono::milliseconds(timeout_ms);
218 
219  while (q_.empty()) {
220  if (stop_requested_)
221  return Status(error::STOPPED, "");
222 
223  if (timeout_ms < 0) {
224  // Wait forever, or until Stop.
225  not_empty_cv_.Wait(&mutex_);
226  } else {
227  auto elapsed = std::chrono::steady_clock::now() - start;
228  if (elapsed < timeout_delta) {
229  // Wait with timeout, or until Stop.
230  not_empty_cv_.WaitWithTimeout(
231  &mutex_, absl::FromChrono(timeout_delta - elapsed));
232  } else {
233  // We're through waiting.
234  return Status(error::TIME_OUT, "Time out on popping.");
235  }
236  }
237  woken = true;
238  }
239 
240  // Signal producer to proceed if we are going to create some capacity.
241  if (q_.size() == capacity_)
242  not_full_cv_.Signal();
243 
244  *element = q_.front();
245  q_.pop_front();
246  ++head_pos_;
247 
248  // Signal other consumers if we have more elements.
249  if (woken && !q_.empty())
250  not_empty_cv_.Signal();
251  return Status::OK;
252 }
253 
254 template <class T>
256  T* element,
257  int64_t timeout_ms) {
258  absl::MutexLock lock(&mutex_);
259  if (pos < head_pos_) {
260  return Status(error::INVALID_ARGUMENT,
261  absl::StrFormat("pos (%zu) is too small; head is at %zu.",
262  pos, head_pos_));
263  }
264 
265  bool woken = false;
266 
267  auto start = std::chrono::steady_clock::now();
268  auto timeout_delta = std::chrono::milliseconds(timeout_ms);
269 
270  // Move head to create some space (move the sliding window centered @ pos).
271  SlideHeadOnCenter(pos);
272 
273  while (pos >= head_pos_ + q_.size()) {
274  if (stop_requested_)
275  return Status(error::STOPPED, "");
276 
277  if (timeout_ms < 0) {
278  // Wait forever, or until Stop.
279  new_element_cv_.Wait(&mutex_);
280  } else {
281  auto elapsed = std::chrono::steady_clock::now() - start;
282  if (elapsed < timeout_delta) {
283  // Wait with timeout, or until Stop.
284  new_element_cv_.WaitWithTimeout(
285  &mutex_, absl::FromChrono(timeout_delta - elapsed));
286  } else {
287  // We're through waiting.
288  return Status(error::TIME_OUT, "Time out on peeking.");
289  }
290  }
291  // Move head to create some space (move the sliding window centered @ pos).
292  SlideHeadOnCenter(pos);
293  woken = true;
294  }
295 
296  *element = q_[pos - head_pos_];
297 
298  // Signal other consumers if we have more elements.
299  if (woken && !q_.empty())
300  new_element_cv_.Signal();
301  return Status::OK;
302 }
303 
304 template <class T>
306  mutex_.AssertHeld();
307 
308  if (capacity_) {
309  // Signal producer to proceed if we are going to create some capacity.
310  if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
311  not_full_cv_.Signal();
312 
313  while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
314  ++head_pos_;
315  q_.pop_front();
316  }
317  }
318 }
319 
320 } // namespace media
321 } // namespace shaka
322 
323 #endif // PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
Status Push(const T &element, int64_t timeout_ms)
Status Peek(size_t pos, T *element, int64_t timeout_ms)
Status Pop(T *element, int64_t timeout_ms)
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66