Shaka Packager SDK
Loading...
Searching...
No Matches
producer_consumer_queue.h
1// Copyright 2014 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8#define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
9
10#include <chrono>
11#include <deque>
12
13#include <absl/log/check.h>
14#include <absl/log/log.h>
15#include <absl/strings/str_format.h>
16#include <absl/synchronization/mutex.h>
17#include <absl/time/time.h>
18
19#include <packager/macros/classes.h>
20#include <packager/status.h>
21
22namespace shaka {
23namespace media {
24
25static const size_t kUnlimitedCapacity = 0u;
26static const int64_t kInfiniteTimeout = -1;
27
31template <class T>
33 public:
37 explicit ProducerConsumerQueue(size_t capacity);
38
43 ProducerConsumerQueue(size_t capacity, size_t starting_pos);
44
46
55 Status Push(const T& element, int64_t timeout_ms);
56
64 Status Pop(T* element, int64_t timeout_ms);
65
78 Status Peek(size_t pos, T* element, int64_t timeout_ms);
79
83 void Stop() {
84 absl::MutexLock lock(&mutex_);
85 stop_requested_ = true;
86 not_empty_cv_.SignalAll();
87 not_full_cv_.SignalAll();
88 new_element_cv_.SignalAll();
89 }
90
92 bool Empty() const {
93 absl::MutexLock lock(&mutex_);
94 return q_.empty();
95 }
96
98 size_t Size() const {
99 absl::MutexLock lock(&mutex_);
100 return q_.size();
101 }
102
105 size_t HeadPos() const {
106 absl::MutexLock lock(&mutex_);
107 return head_pos_;
108 }
109
112 size_t TailPos() const {
113 absl::MutexLock lock(&mutex_);
114 return head_pos_ + q_.size() - 1;
115 }
116
119 bool Stopped() const {
120 absl::MutexLock lock(&mutex_);
121 return stop_requested_;
122 }
123
124 private:
125 // Move head_pos_ to center on pos.
126 void SlideHeadOnCenter(size_t pos);
127
128 const size_t capacity_; // Maximum number of elements; zero means unlimited.
129
130 mutable absl::Mutex mutex_;
131 size_t head_pos_ ABSL_GUARDED_BY(mutex_); // Head position.
132 std::deque<T> q_
133 ABSL_GUARDED_BY(mutex_); // Internal queue holding the elements.
134 absl::CondVar not_empty_cv_ ABSL_GUARDED_BY(mutex_);
135 absl::CondVar not_full_cv_ ABSL_GUARDED_BY(mutex_);
136 absl::CondVar new_element_cv_ ABSL_GUARDED_BY(mutex_);
137 bool stop_requested_
138 ABSL_GUARDED_BY(mutex_); // True after Stop has been called.
139
140 DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
141};
142
143// Implementations of non-inline functions.
144template <class T>
146 : capacity_(capacity),
147 head_pos_(0),
148 stop_requested_(false) {}
149
150template <class T>
152 size_t starting_pos)
153 : capacity_(capacity),
154 head_pos_(starting_pos),
155 stop_requested_(false) {
156}
157
158template <class T>
160
161template <class T>
162Status ProducerConsumerQueue<T>::Push(const T& element, int64_t timeout_ms) {
163 absl::MutexLock lock(&mutex_);
164 bool woken = false;
165
166 // Check for queue shutdown.
167 if (stop_requested_)
168 return Status(error::STOPPED, "");
169
170 auto start = std::chrono::steady_clock::now();
171 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
172
173 if (capacity_) {
174 while (q_.size() == capacity_) {
175 if (timeout_ms < 0) {
176 // Wait forever, or until Stop.
177 not_full_cv_.Wait(&mutex_);
178 } else {
179 auto elapsed = std::chrono::steady_clock::now() - start;
180 if (elapsed < timeout_delta) {
181 // Wait with timeout, or until Stop.
182 not_full_cv_.WaitWithTimeout(
183 &mutex_, absl::FromChrono(timeout_delta - elapsed));
184 } else {
185 // We're through waiting.
186 return Status(error::TIME_OUT, "Time out on pushing.");
187 }
188 }
189 // Re-check for queue shutdown after waking from Wait.
190 if (stop_requested_)
191 return Status(error::STOPPED, "");
192
193 woken = true;
194 }
195 DCHECK_LT(q_.size(), capacity_);
196 }
197
198 // Signal consumer to proceed if we are going to create some elements.
199 if (q_.empty())
200 not_empty_cv_.Signal();
201 new_element_cv_.Signal();
202
203 q_.push_back(element);
204
205 // Signal other producers if we just acquired more capacity.
206 if (woken && q_.size() != capacity_)
207 not_full_cv_.Signal();
208 return Status::OK;
209}
210
211template <class T>
212Status ProducerConsumerQueue<T>::Pop(T* element, int64_t timeout_ms) {
213 absl::MutexLock lock(&mutex_);
214 bool woken = false;
215
216 auto start = std::chrono::steady_clock::now();
217 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
218
219 while (q_.empty()) {
220 if (stop_requested_)
221 return Status(error::STOPPED, "");
222
223 if (timeout_ms < 0) {
224 // Wait forever, or until Stop.
225 not_empty_cv_.Wait(&mutex_);
226 } else {
227 auto elapsed = std::chrono::steady_clock::now() - start;
228 if (elapsed < timeout_delta) {
229 // Wait with timeout, or until Stop.
230 not_empty_cv_.WaitWithTimeout(
231 &mutex_, absl::FromChrono(timeout_delta - elapsed));
232 } else {
233 // We're through waiting.
234 return Status(error::TIME_OUT, "Time out on popping.");
235 }
236 }
237 woken = true;
238 }
239
240 // Signal producer to proceed if we are going to create some capacity.
241 if (q_.size() == capacity_)
242 not_full_cv_.Signal();
243
244 *element = q_.front();
245 q_.pop_front();
246 ++head_pos_;
247
248 // Signal other consumers if we have more elements.
249 if (woken && !q_.empty())
250 not_empty_cv_.Signal();
251 return Status::OK;
252}
253
254template <class T>
256 T* element,
257 int64_t timeout_ms) {
258 absl::MutexLock lock(&mutex_);
259 if (pos < head_pos_) {
260 return Status(error::INVALID_ARGUMENT,
261 absl::StrFormat("pos (%zu) is too small; head is at %zu.",
262 pos, head_pos_));
263 }
264
265 bool woken = false;
266
267 auto start = std::chrono::steady_clock::now();
268 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
269
270 // Move head to create some space (move the sliding window centered @ pos).
271 SlideHeadOnCenter(pos);
272
273 while (pos >= head_pos_ + q_.size()) {
274 if (stop_requested_)
275 return Status(error::STOPPED, "");
276
277 if (timeout_ms < 0) {
278 // Wait forever, or until Stop.
279 new_element_cv_.Wait(&mutex_);
280 } else {
281 auto elapsed = std::chrono::steady_clock::now() - start;
282 if (elapsed < timeout_delta) {
283 // Wait with timeout, or until Stop.
284 new_element_cv_.WaitWithTimeout(
285 &mutex_, absl::FromChrono(timeout_delta - elapsed));
286 } else {
287 // We're through waiting.
288 return Status(error::TIME_OUT, "Time out on peeking.");
289 }
290 }
291 // Move head to create some space (move the sliding window centered @ pos).
292 SlideHeadOnCenter(pos);
293 woken = true;
294 }
295
296 *element = q_[pos - head_pos_];
297
298 // Signal other consumers if we have more elements.
299 if (woken && !q_.empty())
300 new_element_cv_.Signal();
301 return Status::OK;
302}
303
304template <class T>
306 mutex_.AssertHeld();
307
308 if (capacity_) {
309 // Signal producer to proceed if we are going to create some capacity.
310 if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
311 not_full_cv_.Signal();
312
313 while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
314 ++head_pos_;
315 q_.pop_front();
316 }
317 }
318}
319
320} // namespace media
321} // namespace shaka
322
323#endif // PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
Status Push(const T &element, int64_t timeout_ms)
Status Peek(size_t pos, T *element, int64_t timeout_ms)
Status Pop(T *element, int64_t timeout_ms)
All the methods that are virtual are virtual for mocking.