Shaka Packager SDK
io_cache.cc
1 // Copyright 2015 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/file/io_cache.h>
8 
9 #include <algorithm>
10 #include <cstring>
11 
12 #include <absl/log/check.h>
13 #include <absl/log/log.h>
14 
15 #include <packager/macros/logging.h>
16 
17 namespace shaka {
18 
19 IoCache::IoCache(uint64_t cache_size)
20  : cache_size_(cache_size),
21  // Make the buffer one byte larger than the cache so that when the
22  // condition r_ptr == w_ptr is unambiguous (buffer empty).
23  circular_buffer_(cache_size + 1),
24  end_ptr_(&circular_buffer_[0] + cache_size + 1),
25  r_ptr_(circular_buffer_.data()),
26  w_ptr_(circular_buffer_.data()),
27  closed_(false) {}
28 
29 IoCache::~IoCache() {
30  Close();
31 }
32 
33 uint64_t IoCache::Read(void* buffer, uint64_t size) {
34  DCHECK(buffer);
35 
36  absl::MutexLock lock(&mutex_);
37  while (!closed_ && (BytesCachedInternal() == 0)) {
38  write_event_.Wait(&mutex_);
39  }
40 
41  size = std::min(size, BytesCachedInternal());
42  uint64_t first_chunk_size(
43  std::min(size, static_cast<uint64_t>(end_ptr_ - r_ptr_)));
44  memcpy(buffer, r_ptr_, first_chunk_size);
45  r_ptr_ += first_chunk_size;
46  DCHECK_GE(end_ptr_, r_ptr_);
47  if (r_ptr_ == end_ptr_)
48  r_ptr_ = &circular_buffer_[0];
49  uint64_t second_chunk_size(size - first_chunk_size);
50  if (second_chunk_size) {
51  memcpy(static_cast<uint8_t*>(buffer) + first_chunk_size, r_ptr_,
52  second_chunk_size);
53  r_ptr_ += second_chunk_size;
54  DCHECK_GT(end_ptr_, r_ptr_);
55  }
56  read_event_.Signal();
57  return size;
58 }
59 
60 uint64_t IoCache::Write(const void* buffer, uint64_t size) {
61  DCHECK(buffer);
62 
63  const uint8_t* r_ptr(static_cast<const uint8_t*>(buffer));
64  uint64_t bytes_left(size);
65  while (bytes_left) {
66  absl::MutexLock lock(&mutex_);
67  while (!closed_ && (BytesFreeInternal() == 0)) {
68  VLOG(1) << "Circular buffer is full, which can happen if data arrives "
69  "faster than being consumed by packager. Ignore if it is not "
70  "live packaging. Otherwise, try increasing --io_cache_size.";
71  read_event_.Wait(&mutex_);
72  }
73  if (closed_)
74  return 0;
75 
76  uint64_t write_size(std::min(bytes_left, BytesFreeInternal()));
77  uint64_t first_chunk_size(
78  std::min(write_size, static_cast<uint64_t>(end_ptr_ - w_ptr_)));
79  memcpy(w_ptr_, r_ptr, first_chunk_size);
80  w_ptr_ += first_chunk_size;
81  DCHECK_GE(end_ptr_, w_ptr_);
82  if (w_ptr_ == end_ptr_)
83  w_ptr_ = &circular_buffer_[0];
84  r_ptr += first_chunk_size;
85  uint64_t second_chunk_size(write_size - first_chunk_size);
86  if (second_chunk_size) {
87  memcpy(w_ptr_, r_ptr, second_chunk_size);
88  w_ptr_ += second_chunk_size;
89  DCHECK_GT(end_ptr_, w_ptr_);
90  r_ptr += second_chunk_size;
91  }
92  bytes_left -= write_size;
93  write_event_.Signal();
94  }
95  return size;
96 }
97 
98 void IoCache::Clear() {
99  absl::MutexLock lock(&mutex_);
100  r_ptr_ = w_ptr_ = circular_buffer_.data();
101  // Let any writers know that there is room in the cache.
102  read_event_.Signal();
103 }
104 
105 void IoCache::Close() {
106  absl::MutexLock lock(&mutex_);
107  closed_ = true;
108  read_event_.Signal();
109  write_event_.Signal();
110 }
111 
112 void IoCache::Reopen() {
113  absl::MutexLock lock(&mutex_);
114  CHECK(closed_);
115  r_ptr_ = w_ptr_ = circular_buffer_.data();
116  closed_ = false;
117 }
118 
119 uint64_t IoCache::BytesCached() {
120  absl::MutexLock lock(&mutex_);
121  return BytesCachedInternal();
122 }
123 
124 uint64_t IoCache::BytesFree() {
125  absl::MutexLock lock(&mutex_);
126  return BytesFreeInternal();
127 }
128 
129 uint64_t IoCache::BytesCachedInternal() {
130  return (r_ptr_ <= w_ptr_)
131  ? w_ptr_ - r_ptr_
132  : (end_ptr_ - r_ptr_) + (w_ptr_ - circular_buffer_.data());
133 }
134 
135 uint64_t IoCache::BytesFreeInternal() {
136  return cache_size_ - BytesCachedInternal();
137 }
138 
139 void IoCache::WaitUntilEmptyOrClosed() {
140  absl::MutexLock lock(&mutex_);
141  while (!closed_ && BytesCachedInternal()) {
142  read_event_.Wait(&mutex_);
143  }
144 }
145 
146 } // namespace shaka
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66