Shaka Packager SDK
threaded_io_file.cc
1 // Copyright 2015 Google LLC. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include <packager/file/threaded_io_file.h>
8 
9 #include <absl/log/check.h>
10 
11 #include <packager/file/thread_pool.h>
12 
13 namespace shaka {
14 
15 ThreadedIoFile::ThreadedIoFile(std::unique_ptr<File, FileCloser> internal_file,
16  Mode mode,
17  uint64_t io_cache_size,
18  uint64_t io_block_size)
19  : File(internal_file->file_name()),
20  internal_file_(std::move(internal_file)),
21  mode_(mode),
22  cache_(io_cache_size),
23  io_buffer_(io_block_size),
24  position_(0),
25  size_(0),
26  eof_(false),
27  internal_file_error_(0),
28  flushing_(false),
29  flush_complete_(false),
30  task_exited_(false) {
31  DCHECK(internal_file_);
32 }
33 
34 ThreadedIoFile::~ThreadedIoFile() {}
35 
36 bool ThreadedIoFile::Open() {
37  DCHECK(internal_file_);
38 
39  if (!internal_file_->Open())
40  return false;
41 
42  position_ = 0;
43  size_ = internal_file_->Size();
44 
45  ThreadPool::instance.PostTask(std::bind(&ThreadedIoFile::TaskHandler, this));
46  return true;
47 }
48 
49 bool ThreadedIoFile::Close() {
50  DCHECK(internal_file_);
51 
52  bool result = true;
53  if (mode_ == kOutputMode)
54  result = Flush();
55 
56  cache_.Close();
57  WaitForSignal(&task_exited_mutex_, &task_exited_);
58 
59  result &= internal_file_.release()->Close();
60  delete this;
61  return result;
62 }
63 
64 int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) {
65  DCHECK(internal_file_);
66  DCHECK_EQ(kInputMode, mode_);
67 
68  if (eof_.load(std::memory_order_relaxed) && !cache_.BytesCached())
69  return 0;
70 
71  if (internal_file_error_.load(std::memory_order_relaxed))
72  return internal_file_error_.load(std::memory_order_relaxed);
73 
74  uint64_t bytes_read = cache_.Read(buffer, length);
75  position_ += bytes_read;
76 
77  return bytes_read;
78 }
79 
80 int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
81  DCHECK(internal_file_);
82  DCHECK_EQ(kOutputMode, mode_);
83 
84  if (internal_file_error_.load(std::memory_order_relaxed))
85  return internal_file_error_.load(std::memory_order_relaxed);
86 
87  uint64_t bytes_written = cache_.Write(buffer, length);
88  position_ += bytes_written;
89  if (position_ > size_)
90  size_ = position_;
91 
92  return bytes_written;
93 }
94 
95 void ThreadedIoFile::CloseForWriting() {}
96 
97 int64_t ThreadedIoFile::Size() {
98  DCHECK(internal_file_);
99 
100  return size_;
101 }
102 
103 bool ThreadedIoFile::Flush() {
104  DCHECK(internal_file_);
105  DCHECK_EQ(kOutputMode, mode_);
106 
107  if (internal_file_error_.load(std::memory_order_relaxed))
108  return false;
109 
110  {
111  absl::MutexLock lock(&flush_mutex_);
112  flushing_ = true;
113  flush_complete_ = false;
114  }
115  cache_.Close();
116 
117  WaitForSignal(&flush_mutex_, &flush_complete_);
118 
119  return internal_file_->Flush();
120 }
121 
122 bool ThreadedIoFile::Seek(uint64_t position) {
123  if (mode_ == kOutputMode) {
124  // Writing. Just flush the cache and seek.
125  if (!Flush())
126  return false;
127  if (!internal_file_->Seek(position))
128  return false;
129  } else {
130  // Reading. Close cache, wait for thread task to exit, seek, and re-post
131  // the task.
132  cache_.Close();
133  WaitForSignal(&task_exited_mutex_, &task_exited_);
134 
135  bool result = internal_file_->Seek(position);
136  if (!result) {
137  // Seek failed. Seek to logical position instead.
138  if (!internal_file_->Seek(position_) && (position != position_)) {
139  LOG(WARNING) << "Seek failed. ThreadedIoFile left in invalid state.";
140  }
141  }
142  cache_.Reopen();
143  eof_ = false;
144 
145  ThreadPool::instance.PostTask(
146  std::bind(&ThreadedIoFile::TaskHandler, this));
147  if (!result)
148  return false;
149  }
150  position_ = position;
151  return true;
152 }
153 
154 bool ThreadedIoFile::Tell(uint64_t* position) {
155  DCHECK(position);
156 
157  *position = position_;
158  return true;
159 }
160 
161 void ThreadedIoFile::TaskHandler() {
162  {
163  absl::MutexLock lock(&task_exited_mutex_);
164  task_exited_ = false;
165  }
166 
167  if (mode_ == kInputMode)
168  RunInInputMode();
169  else
170  RunInOutputMode();
171 
172  {
173  absl::MutexLock lock(&task_exited_mutex_);
174  task_exited_ = true;
175  }
176 }
177 
178 void ThreadedIoFile::RunInInputMode() {
179  DCHECK(internal_file_);
180  DCHECK_EQ(kInputMode, mode_);
181 
182  while (true) {
183  int64_t read_result =
184  internal_file_->Read(&io_buffer_[0], io_buffer_.size());
185  if (read_result <= 0) {
186  eof_.store(read_result == 0, std::memory_order_relaxed);
187  internal_file_error_.store(read_result, std::memory_order_relaxed);
188  cache_.Close();
189  return;
190  }
191  if (cache_.Write(&io_buffer_[0], read_result) == 0) {
192  return;
193  }
194  }
195 }
196 
197 void ThreadedIoFile::RunInOutputMode() {
198  DCHECK(internal_file_);
199  DCHECK_EQ(kOutputMode, mode_);
200 
201  while (true) {
202  uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
203  if (write_bytes == 0) {
204  absl::MutexLock lock(&flush_mutex_);
205  if (flushing_) {
206  cache_.Reopen();
207  flushing_ = false;
208  flush_complete_ = true;
209  } else {
210  return;
211  }
212  } else {
213  uint64_t bytes_written(0);
214  while (bytes_written < write_bytes) {
215  int64_t write_result = internal_file_->Write(
216  &io_buffer_[bytes_written], write_bytes - bytes_written);
217  if (write_result < 0) {
218  internal_file_error_.store(write_result, std::memory_order_relaxed);
219  cache_.Close();
220 
221  absl::MutexLock lock(&flush_mutex_);
222  if (flushing_) {
223  flushing_ = false;
224  flush_complete_ = true;
225  }
226  return;
227  }
228  bytes_written += write_result;
229  }
230  }
231  }
232 }
233 
234 void ThreadedIoFile::WaitForSignal(absl::Mutex* mutex, bool* condition) {
235  // This waits until the boolean condition variable is true, then locks the
236  // mutex. The check is done every time the mutex is unlocked. As long as
237  // this mutex is held when the variable is modified, this wait will always
238  // wake up when the variable is changed to true.
239  mutex->LockWhen(absl::Condition(condition));
240 
241  // LockWhen leaves the mutex locked. Return after unlocking the mutex again.
242  mutex->Unlock();
243 }
244 
245 } // namespace shaka
All the methods that are virtual are virtual for mocking.
Definition: crypto_flags.cc:66