7 #include <packager/file/io_cache.h>
12 #include <absl/log/check.h>
13 #include <absl/log/log.h>
15 #include <packager/macros/logging.h>
19 IoCache::IoCache(uint64_t cache_size)
20 : cache_size_(cache_size),
23 circular_buffer_(cache_size + 1),
24 end_ptr_(&circular_buffer_[0] + cache_size + 1),
25 r_ptr_(circular_buffer_.data()),
26 w_ptr_(circular_buffer_.data()),
33 uint64_t IoCache::Read(
void* buffer, uint64_t size) {
36 absl::MutexLock lock(&mutex_);
37 while (!closed_ && (BytesCachedInternal() == 0)) {
38 write_event_.Wait(&mutex_);
41 size = std::min(size, BytesCachedInternal());
42 uint64_t first_chunk_size(
43 std::min(size,
static_cast<uint64_t
>(end_ptr_ - r_ptr_)));
44 memcpy(buffer, r_ptr_, first_chunk_size);
45 r_ptr_ += first_chunk_size;
46 DCHECK_GE(end_ptr_, r_ptr_);
47 if (r_ptr_ == end_ptr_)
48 r_ptr_ = &circular_buffer_[0];
49 uint64_t second_chunk_size(size - first_chunk_size);
50 if (second_chunk_size) {
51 memcpy(
static_cast<uint8_t*
>(buffer) + first_chunk_size, r_ptr_,
53 r_ptr_ += second_chunk_size;
54 DCHECK_GT(end_ptr_, r_ptr_);
60 uint64_t IoCache::Write(
const void* buffer, uint64_t size) {
63 const uint8_t* r_ptr(
static_cast<const uint8_t*
>(buffer));
64 uint64_t bytes_left(size);
66 absl::MutexLock lock(&mutex_);
67 while (!closed_ && (BytesFreeInternal() == 0)) {
68 VLOG(1) <<
"Circular buffer is full, which can happen if data arrives "
69 "faster than being consumed by packager. Ignore if it is not "
70 "live packaging. Otherwise, try increasing --io_cache_size.";
71 read_event_.Wait(&mutex_);
76 uint64_t write_size(std::min(bytes_left, BytesFreeInternal()));
77 uint64_t first_chunk_size(
78 std::min(write_size,
static_cast<uint64_t
>(end_ptr_ - w_ptr_)));
79 memcpy(w_ptr_, r_ptr, first_chunk_size);
80 w_ptr_ += first_chunk_size;
81 DCHECK_GE(end_ptr_, w_ptr_);
82 if (w_ptr_ == end_ptr_)
83 w_ptr_ = &circular_buffer_[0];
84 r_ptr += first_chunk_size;
85 uint64_t second_chunk_size(write_size - first_chunk_size);
86 if (second_chunk_size) {
87 memcpy(w_ptr_, r_ptr, second_chunk_size);
88 w_ptr_ += second_chunk_size;
89 DCHECK_GT(end_ptr_, w_ptr_);
90 r_ptr += second_chunk_size;
92 bytes_left -= write_size;
93 write_event_.Signal();
98 void IoCache::Clear() {
99 absl::MutexLock lock(&mutex_);
100 r_ptr_ = w_ptr_ = circular_buffer_.data();
102 read_event_.Signal();
105 void IoCache::Close() {
106 absl::MutexLock lock(&mutex_);
108 read_event_.Signal();
109 write_event_.Signal();
112 void IoCache::Reopen() {
113 absl::MutexLock lock(&mutex_);
115 r_ptr_ = w_ptr_ = circular_buffer_.data();
119 uint64_t IoCache::BytesCached() {
120 absl::MutexLock lock(&mutex_);
121 return BytesCachedInternal();
124 uint64_t IoCache::BytesFree() {
125 absl::MutexLock lock(&mutex_);
126 return BytesFreeInternal();
129 uint64_t IoCache::BytesCachedInternal() {
130 return (r_ptr_ <= w_ptr_)
132 : (end_ptr_ - r_ptr_) + (w_ptr_ - circular_buffer_.data());
135 uint64_t IoCache::BytesFreeInternal() {
136 return cache_size_ - BytesCachedInternal();
139 void IoCache::WaitUntilEmptyOrClosed() {
140 absl::MutexLock lock(&mutex_);
141 while (!closed_ && BytesCachedInternal()) {
142 read_event_.Wait(&mutex_);
All the methods that are virtual are virtual for mocking.