Shaka Packager SDK
Loading...
Searching...
No Matches
producer_consumer_queue.h
1// Copyright 2014 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8#define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
9
10#include <chrono>
11#include <deque>
12
13#include <absl/log/check.h>
14#include <absl/log/log.h>
15#include <absl/strings/str_format.h>
16#include <absl/synchronization/mutex.h>
17#include <absl/time/time.h>
18
19#include <packager/macros/classes.h>
20#include <packager/status.h>
21
22namespace shaka {
23namespace media {
24
25static const size_t kUnlimitedCapacity = 0u;
26static const int64_t kInfiniteTimeout = -1;
27
31template <class T>
33 public:
37 explicit ProducerConsumerQueue(size_t capacity);
38
43 ProducerConsumerQueue(size_t capacity, size_t starting_pos);
44
46
55 Status Push(const T& element, int64_t timeout_ms);
56
64 Status Pop(T* element, int64_t timeout_ms);
65
78 Status Peek(size_t pos, T* element, int64_t timeout_ms);
79
83 void Stop() {
84 absl::MutexLock lock(mutex_);
85 stop_requested_ = true;
86 not_empty_cv_.SignalAll();
87 not_full_cv_.SignalAll();
88 new_element_cv_.SignalAll();
89 }
90
92 bool Empty() const {
93 absl::MutexLock lock(mutex_);
94 return q_.empty();
95 }
96
98 size_t Size() const {
99 absl::MutexLock lock(mutex_);
100 return q_.size();
101 }
102
105 size_t HeadPos() const {
106 absl::MutexLock lock(mutex_);
107 return head_pos_;
108 }
109
112 size_t TailPos() const {
113 absl::MutexLock lock(mutex_);
114 return head_pos_ + q_.size() - 1;
115 }
116
119 bool Stopped() const {
120 absl::MutexLock lock(mutex_);
121 return stop_requested_;
122 }
123
124 private:
125 // Move head_pos_ to center on pos.
126 void SlideHeadOnCenter(size_t pos);
127
128 const size_t capacity_; // Maximum number of elements; zero means unlimited.
129
130 mutable absl::Mutex mutex_;
131 size_t head_pos_ ABSL_GUARDED_BY(mutex_); // Head position.
132 std::deque<T> q_
133 ABSL_GUARDED_BY(mutex_); // Internal queue holding the elements.
134 absl::CondVar not_empty_cv_ ABSL_GUARDED_BY(mutex_);
135 absl::CondVar not_full_cv_ ABSL_GUARDED_BY(mutex_);
136 absl::CondVar new_element_cv_ ABSL_GUARDED_BY(mutex_);
137 bool stop_requested_
138 ABSL_GUARDED_BY(mutex_); // True after Stop has been called.
139
140 DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
141};
142
143// Implementations of non-inline functions.
144template <class T>
146 : capacity_(capacity), head_pos_(0), stop_requested_(false) {}
147
148template <class T>
150 size_t starting_pos)
151 : capacity_(capacity), head_pos_(starting_pos), stop_requested_(false) {}
152
153template <class T>
155
156template <class T>
157Status ProducerConsumerQueue<T>::Push(const T& element, int64_t timeout_ms) {
158 absl::MutexLock lock(mutex_);
159 bool woken = false;
160
161 // Check for queue shutdown.
162 if (stop_requested_)
163 return Status(error::STOPPED, "");
164
165 auto start = std::chrono::steady_clock::now();
166 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
167
168 if (capacity_) {
169 while (q_.size() == capacity_) {
170 if (timeout_ms < 0) {
171 // Wait forever, or until Stop.
172 not_full_cv_.Wait(&mutex_);
173 } else {
174 auto elapsed = std::chrono::steady_clock::now() - start;
175 if (elapsed < timeout_delta) {
176 // Wait with timeout, or until Stop.
177 not_full_cv_.WaitWithTimeout(
178 &mutex_, absl::FromChrono(timeout_delta - elapsed));
179 } else {
180 // We're through waiting.
181 return Status(error::TIME_OUT, "Time out on pushing.");
182 }
183 }
184 // Re-check for queue shutdown after waking from Wait.
185 if (stop_requested_)
186 return Status(error::STOPPED, "");
187
188 woken = true;
189 }
190 DCHECK_LT(q_.size(), capacity_);
191 }
192
193 // Signal consumer to proceed if we are going to create some elements.
194 if (q_.empty())
195 not_empty_cv_.Signal();
196 new_element_cv_.Signal();
197
198 q_.push_back(element);
199
200 // Signal other producers if we just acquired more capacity.
201 if (woken && q_.size() != capacity_)
202 not_full_cv_.Signal();
203 return Status::OK;
204}
205
206template <class T>
207Status ProducerConsumerQueue<T>::Pop(T* element, int64_t timeout_ms) {
208 absl::MutexLock lock(mutex_);
209 bool woken = false;
210
211 auto start = std::chrono::steady_clock::now();
212 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
213
214 while (q_.empty()) {
215 if (stop_requested_)
216 return Status(error::STOPPED, "");
217
218 if (timeout_ms < 0) {
219 // Wait forever, or until Stop.
220 not_empty_cv_.Wait(&mutex_);
221 } else {
222 auto elapsed = std::chrono::steady_clock::now() - start;
223 if (elapsed < timeout_delta) {
224 // Wait with timeout, or until Stop.
225 not_empty_cv_.WaitWithTimeout(
226 &mutex_, absl::FromChrono(timeout_delta - elapsed));
227 } else {
228 // We're through waiting.
229 return Status(error::TIME_OUT, "Time out on popping.");
230 }
231 }
232 woken = true;
233 }
234
235 // Signal producer to proceed if we are going to create some capacity.
236 if (q_.size() == capacity_)
237 not_full_cv_.Signal();
238
239 *element = q_.front();
240 q_.pop_front();
241 ++head_pos_;
242
243 // Signal other consumers if we have more elements.
244 if (woken && !q_.empty())
245 not_empty_cv_.Signal();
246 return Status::OK;
247}
248
249template <class T>
251 T* element,
252 int64_t timeout_ms) {
253 absl::MutexLock lock(mutex_);
254 if (pos < head_pos_) {
255 return Status(error::INVALID_ARGUMENT,
256 absl::StrFormat("pos (%zu) is too small; head is at %zu.",
257 pos, head_pos_));
258 }
259
260 bool woken = false;
261
262 auto start = std::chrono::steady_clock::now();
263 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
264
265 // Move head to create some space (move the sliding window centered @ pos).
266 SlideHeadOnCenter(pos);
267
268 while (pos >= head_pos_ + q_.size()) {
269 if (stop_requested_)
270 return Status(error::STOPPED, "");
271
272 if (timeout_ms < 0) {
273 // Wait forever, or until Stop.
274 new_element_cv_.Wait(&mutex_);
275 } else {
276 auto elapsed = std::chrono::steady_clock::now() - start;
277 if (elapsed < timeout_delta) {
278 // Wait with timeout, or until Stop.
279 new_element_cv_.WaitWithTimeout(
280 &mutex_, absl::FromChrono(timeout_delta - elapsed));
281 } else {
282 // We're through waiting.
283 return Status(error::TIME_OUT, "Time out on peeking.");
284 }
285 }
286 // Move head to create some space (move the sliding window centered @ pos).
287 SlideHeadOnCenter(pos);
288 woken = true;
289 }
290
291 *element = q_[pos - head_pos_];
292
293 // Signal other consumers if we have more elements.
294 if (woken && !q_.empty())
295 new_element_cv_.Signal();
296 return Status::OK;
297}
298
299template <class T>
301 mutex_.AssertHeld();
302
303 if (capacity_) {
304 // Signal producer to proceed if we are going to create some capacity.
305 if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
306 not_full_cv_.Signal();
307
308 while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
309 ++head_pos_;
310 q_.pop_front();
311 }
312 }
313}
314
315} // namespace media
316} // namespace shaka
317
318#endif // PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
Status Push(const T &element, int64_t timeout_ms)
Status Peek(size_t pos, T *element, int64_t timeout_ms)
Status Pop(T *element, int64_t timeout_ms)
All the methods that are virtual are virtual for mocking.