7 #ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8 #define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
13 #include <absl/log/check.h>
14 #include <absl/log/log.h>
15 #include <absl/strings/str_format.h>
16 #include <absl/synchronization/mutex.h>
17 #include <absl/time/time.h>
19 #include <packager/macros/classes.h>
20 #include <packager/status.h>
25 static const size_t kUnlimitedCapacity = 0u;
26 static const int64_t kInfiniteTimeout = -1;
55 Status
Push(
const T& element, int64_t timeout_ms);
64 Status
Pop(T* element, int64_t timeout_ms);
78 Status
Peek(
size_t pos, T* element, int64_t timeout_ms);
84 absl::MutexLock lock(&mutex_);
85 stop_requested_ =
true;
86 not_empty_cv_.SignalAll();
87 not_full_cv_.SignalAll();
88 new_element_cv_.SignalAll();
93 absl::MutexLock lock(&mutex_);
99 absl::MutexLock lock(&mutex_);
106 absl::MutexLock lock(&mutex_);
113 absl::MutexLock lock(&mutex_);
114 return head_pos_ + q_.size() - 1;
120 absl::MutexLock lock(&mutex_);
121 return stop_requested_;
126 void SlideHeadOnCenter(
size_t pos);
128 const size_t capacity_;
130 mutable absl::Mutex mutex_;
131 size_t head_pos_ ABSL_GUARDED_BY(mutex_);
133 ABSL_GUARDED_BY(mutex_);
134 absl::CondVar not_empty_cv_ ABSL_GUARDED_BY(mutex_);
135 absl::CondVar not_full_cv_ ABSL_GUARDED_BY(mutex_);
136 absl::CondVar new_element_cv_ ABSL_GUARDED_BY(mutex_);
138 ABSL_GUARDED_BY(mutex_);
146 : capacity_(capacity),
148 stop_requested_(false) {}
153 : capacity_(capacity),
154 head_pos_(starting_pos),
155 stop_requested_(false) {
163 absl::MutexLock lock(&mutex_);
168 return Status(error::STOPPED,
"");
170 auto start = std::chrono::steady_clock::now();
171 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
174 while (q_.size() == capacity_) {
175 if (timeout_ms < 0) {
177 not_full_cv_.Wait(&mutex_);
179 auto elapsed = std::chrono::steady_clock::now() - start;
180 if (elapsed < timeout_delta) {
182 not_full_cv_.WaitWithTimeout(
183 &mutex_, absl::FromChrono(timeout_delta - elapsed));
186 return Status(error::TIME_OUT,
"Time out on pushing.");
191 return Status(error::STOPPED,
"");
195 DCHECK_LT(q_.size(), capacity_);
200 not_empty_cv_.Signal();
201 new_element_cv_.Signal();
203 q_.push_back(element);
206 if (woken && q_.size() != capacity_)
207 not_full_cv_.Signal();
213 absl::MutexLock lock(&mutex_);
216 auto start = std::chrono::steady_clock::now();
217 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
221 return Status(error::STOPPED,
"");
223 if (timeout_ms < 0) {
225 not_empty_cv_.Wait(&mutex_);
227 auto elapsed = std::chrono::steady_clock::now() - start;
228 if (elapsed < timeout_delta) {
230 not_empty_cv_.WaitWithTimeout(
231 &mutex_, absl::FromChrono(timeout_delta - elapsed));
234 return Status(error::TIME_OUT,
"Time out on popping.");
241 if (q_.size() == capacity_)
242 not_full_cv_.Signal();
244 *element = q_.front();
249 if (woken && !q_.empty())
250 not_empty_cv_.Signal();
257 int64_t timeout_ms) {
258 absl::MutexLock lock(&mutex_);
259 if (pos < head_pos_) {
260 return Status(error::INVALID_ARGUMENT,
261 absl::StrFormat(
"pos (%zu) is too small; head is at %zu.",
267 auto start = std::chrono::steady_clock::now();
268 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
271 SlideHeadOnCenter(pos);
273 while (pos >= head_pos_ + q_.size()) {
275 return Status(error::STOPPED,
"");
277 if (timeout_ms < 0) {
279 new_element_cv_.Wait(&mutex_);
281 auto elapsed = std::chrono::steady_clock::now() - start;
282 if (elapsed < timeout_delta) {
284 new_element_cv_.WaitWithTimeout(
285 &mutex_, absl::FromChrono(timeout_delta - elapsed));
288 return Status(error::TIME_OUT,
"Time out on peeking.");
292 SlideHeadOnCenter(pos);
296 *element = q_[pos - head_pos_];
299 if (woken && !q_.empty())
300 new_element_cv_.Signal();
310 if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
311 not_full_cv_.Signal();
313 while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
All the methods that are virtual are virtual for mocking.