Shaka Packager SDK
Loading...
Searching...
No Matches
io_cache.cc
1// Copyright 2015 Google LLC. All rights reserved.
2//
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file or at
5// https://developers.google.com/open-source/licenses/bsd
6
7#include <packager/file/io_cache.h>
8
9#include <algorithm>
10#include <cstring>
11
12#include <absl/log/check.h>
13#include <absl/log/log.h>
14
15#include <packager/macros/logging.h>
16
17namespace shaka {
18
19IoCache::IoCache(uint64_t cache_size)
20 : cache_size_(cache_size),
21 // Make the buffer one byte larger than the cache so that when the
22 // condition r_ptr == w_ptr is unambiguous (buffer empty).
23 circular_buffer_(cache_size + 1),
24 end_ptr_(&circular_buffer_[0] + cache_size + 1),
25 r_ptr_(circular_buffer_.data()),
26 w_ptr_(circular_buffer_.data()),
27 closed_(false) {}
28
29IoCache::~IoCache() {
30 Close();
31}
32
33uint64_t IoCache::Read(void* buffer, uint64_t size) {
34 DCHECK(buffer);
35
36 absl::MutexLock lock(&mutex_);
37 while (!closed_ && (BytesCachedInternal() == 0)) {
38 write_event_.Wait(&mutex_);
39 }
40
41 size = std::min(size, BytesCachedInternal());
42 uint64_t first_chunk_size(
43 std::min(size, static_cast<uint64_t>(end_ptr_ - r_ptr_)));
44 memcpy(buffer, r_ptr_, first_chunk_size);
45 r_ptr_ += first_chunk_size;
46 DCHECK_GE(end_ptr_, r_ptr_);
47 if (r_ptr_ == end_ptr_)
48 r_ptr_ = &circular_buffer_[0];
49 uint64_t second_chunk_size(size - first_chunk_size);
50 if (second_chunk_size) {
51 memcpy(static_cast<uint8_t*>(buffer) + first_chunk_size, r_ptr_,
52 second_chunk_size);
53 r_ptr_ += second_chunk_size;
54 DCHECK_GT(end_ptr_, r_ptr_);
55 }
56 read_event_.Signal();
57 return size;
58}
59
60uint64_t IoCache::Write(const void* buffer, uint64_t size) {
61 DCHECK(buffer);
62
63 const uint8_t* r_ptr(static_cast<const uint8_t*>(buffer));
64 uint64_t bytes_left(size);
65 while (bytes_left) {
66 absl::MutexLock lock(&mutex_);
67 while (!closed_ && (BytesFreeInternal() == 0)) {
68 VLOG(1) << "Circular buffer is full, which can happen if data arrives "
69 "faster than being consumed by packager. Ignore if it is not "
70 "live packaging. Otherwise, try increasing --io_cache_size.";
71 read_event_.Wait(&mutex_);
72 }
73 if (closed_)
74 return 0;
75
76 uint64_t write_size(std::min(bytes_left, BytesFreeInternal()));
77 uint64_t first_chunk_size(
78 std::min(write_size, static_cast<uint64_t>(end_ptr_ - w_ptr_)));
79 memcpy(w_ptr_, r_ptr, first_chunk_size);
80 w_ptr_ += first_chunk_size;
81 DCHECK_GE(end_ptr_, w_ptr_);
82 if (w_ptr_ == end_ptr_)
83 w_ptr_ = &circular_buffer_[0];
84 r_ptr += first_chunk_size;
85 uint64_t second_chunk_size(write_size - first_chunk_size);
86 if (second_chunk_size) {
87 memcpy(w_ptr_, r_ptr, second_chunk_size);
88 w_ptr_ += second_chunk_size;
89 DCHECK_GT(end_ptr_, w_ptr_);
90 r_ptr += second_chunk_size;
91 }
92 bytes_left -= write_size;
93 write_event_.Signal();
94 }
95 return size;
96}
97
98void IoCache::Clear() {
99 absl::MutexLock lock(&mutex_);
100 r_ptr_ = w_ptr_ = circular_buffer_.data();
101 // Let any writers know that there is room in the cache.
102 read_event_.Signal();
103}
104
105void IoCache::Close() {
106 absl::MutexLock lock(&mutex_);
107 closed_ = true;
108 read_event_.Signal();
109 write_event_.Signal();
110}
111
112void IoCache::Reopen() {
113 absl::MutexLock lock(&mutex_);
114 CHECK(closed_);
115 r_ptr_ = w_ptr_ = circular_buffer_.data();
116 closed_ = false;
117}
118
119uint64_t IoCache::BytesCached() {
120 absl::MutexLock lock(&mutex_);
121 return BytesCachedInternal();
122}
123
124uint64_t IoCache::BytesFree() {
125 absl::MutexLock lock(&mutex_);
126 return BytesFreeInternal();
127}
128
129uint64_t IoCache::BytesCachedInternal() {
130 return (r_ptr_ <= w_ptr_)
131 ? w_ptr_ - r_ptr_
132 : (end_ptr_ - r_ptr_) + (w_ptr_ - circular_buffer_.data());
133}
134
135uint64_t IoCache::BytesFreeInternal() {
136 return cache_size_ - BytesCachedInternal();
137}
138
139void IoCache::WaitUntilEmptyOrClosed() {
140 absl::MutexLock lock(&mutex_);
141 while (!closed_ && BytesCachedInternal()) {
142 read_event_.Wait(&mutex_);
143 }
144}
145
146} // namespace shaka
All the methods that are virtual are virtual for mocking.