55 Status
Push(
const T& element, int64_t timeout_ms);
64 Status
Pop(T* element, int64_t timeout_ms);
78 Status
Peek(
size_t pos, T* element, int64_t timeout_ms);
84 absl::MutexLock lock(mutex_);
85 stop_requested_ =
true;
86 not_empty_cv_.SignalAll();
87 not_full_cv_.SignalAll();
88 new_element_cv_.SignalAll();
93 absl::MutexLock lock(mutex_);
99 absl::MutexLock lock(mutex_);
106 absl::MutexLock lock(mutex_);
113 absl::MutexLock lock(mutex_);
114 return head_pos_ + q_.size() - 1;
120 absl::MutexLock lock(mutex_);
121 return stop_requested_;
126 void SlideHeadOnCenter(
size_t pos);
128 const size_t capacity_;
130 mutable absl::Mutex mutex_;
131 size_t head_pos_ ABSL_GUARDED_BY(mutex_);
133 ABSL_GUARDED_BY(mutex_);
134 absl::CondVar not_empty_cv_ ABSL_GUARDED_BY(mutex_);
135 absl::CondVar not_full_cv_ ABSL_GUARDED_BY(mutex_);
136 absl::CondVar new_element_cv_ ABSL_GUARDED_BY(mutex_);
138 ABSL_GUARDED_BY(mutex_);
158 absl::MutexLock lock(mutex_);
163 return Status(error::STOPPED,
"");
165 auto start = std::chrono::steady_clock::now();
166 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
169 while (q_.size() == capacity_) {
170 if (timeout_ms < 0) {
172 not_full_cv_.Wait(&mutex_);
174 auto elapsed = std::chrono::steady_clock::now() - start;
175 if (elapsed < timeout_delta) {
177 not_full_cv_.WaitWithTimeout(
178 &mutex_, absl::FromChrono(timeout_delta - elapsed));
181 return Status(error::TIME_OUT,
"Time out on pushing.");
186 return Status(error::STOPPED,
"");
190 DCHECK_LT(q_.size(), capacity_);
195 not_empty_cv_.Signal();
196 new_element_cv_.Signal();
198 q_.push_back(element);
201 if (woken && q_.size() != capacity_)
202 not_full_cv_.Signal();
208 absl::MutexLock lock(mutex_);
211 auto start = std::chrono::steady_clock::now();
212 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
216 return Status(error::STOPPED,
"");
218 if (timeout_ms < 0) {
220 not_empty_cv_.Wait(&mutex_);
222 auto elapsed = std::chrono::steady_clock::now() - start;
223 if (elapsed < timeout_delta) {
225 not_empty_cv_.WaitWithTimeout(
226 &mutex_, absl::FromChrono(timeout_delta - elapsed));
229 return Status(error::TIME_OUT,
"Time out on popping.");
236 if (q_.size() == capacity_)
237 not_full_cv_.Signal();
239 *element = q_.front();
244 if (woken && !q_.empty())
245 not_empty_cv_.Signal();
252 int64_t timeout_ms) {
253 absl::MutexLock lock(mutex_);
254 if (pos < head_pos_) {
255 return Status(error::INVALID_ARGUMENT,
256 absl::StrFormat(
"pos (%zu) is too small; head is at %zu.",
262 auto start = std::chrono::steady_clock::now();
263 auto timeout_delta = std::chrono::milliseconds(timeout_ms);
266 SlideHeadOnCenter(pos);
268 while (pos >= head_pos_ + q_.size()) {
270 return Status(error::STOPPED,
"");
272 if (timeout_ms < 0) {
274 new_element_cv_.Wait(&mutex_);
276 auto elapsed = std::chrono::steady_clock::now() - start;
277 if (elapsed < timeout_delta) {
279 new_element_cv_.WaitWithTimeout(
280 &mutex_, absl::FromChrono(timeout_delta - elapsed));
283 return Status(error::TIME_OUT,
"Time out on peeking.");
287 SlideHeadOnCenter(pos);
291 *element = q_[pos - head_pos_];
294 if (woken && !q_.empty())
295 new_element_cv_.Signal();