Shaka Player Embedded
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
network_thread.cc
Go to the documentation of this file.
1 // Copyright 2018 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <curl/curl.h>
18 #include <sys/select.h>
19 
20 #include <algorithm>
21 #include <cerrno>
22 
24 #include "src/util/utils.h"
25 
26 namespace shaka {
27 
28 namespace {
29 
30 constexpr const long kSmallDelayMs = 100; // NOLINT
31 constexpr const long kMaxDelayMs = 500; // NOLINT
32 
33 } // namespace
34 
36  : mutex_("NetworkThread"),
37  cond_("Networking new request"),
38  multi_handle_(curl_multi_init()),
39  shutdown_(false),
40  thread_("Networking", std::bind(&NetworkThread::ThreadMain, this)) {
41  CHECK(multi_handle_);
42 }
43 
45  CHECK(!thread_.joinable()) << "Need to call Stop() before destroying";
46  DCHECK(requests_.empty());
47  curl_multi_cleanup(multi_handle_);
48 }
49 
51  shutdown_.store(true, std::memory_order_release);
52  cond_.SignalAllIfNotSet();
53  thread_.join();
54 }
55 
57  std::unique_lock<Mutex> lock(mutex_);
58  return util::contains(requests_, request);
59 }
60 
62  std::unique_lock<Mutex> lock(mutex_);
63  DCHECK(!shutdown_.load(std::memory_order_acquire));
64  DCHECK(!util::contains(requests_, request));
65  requests_.push_back(request);
66  CHECK_EQ(curl_multi_add_handle(multi_handle_, request->curl_), CURLM_OK);
67  cond_.SignalAllIfNotSet();
68 }
69 
71  std::unique_lock<Mutex> lock(mutex_);
72  for (auto it = requests_.begin(); it != requests_.end(); it++) {
73  if (*it == request) {
74  CHECK_EQ(curl_multi_remove_handle(multi_handle_, request->curl_),
75  CURLM_OK);
76  requests_.erase(it);
77  break;
78  }
79  }
80 }
81 
82 void NetworkThread::ThreadMain() {
83  while (!shutdown_.load(std::memory_order_acquire)) {
84  fd_set fdread;
85  fd_set fdwrite;
86  fd_set fdexc;
87  long timeout_ms = -1; // NOLINT
88  int maxfd = -1;
89  bool no_handles;
90  {
91  std::unique_lock<Mutex> lock(mutex_);
92  // This will still return success if there are no requests or if there is
93  // an error in one request.
94  int handles = 0;
95  CHECK_EQ(curl_multi_perform(multi_handle_, &handles), CURLM_OK);
96  no_handles = handles == 0;
97 
98  // Get any pending messages and complete any requests that are done.
99  int msg_count;
100  while (CURLMsg* msg = curl_multi_info_read(multi_handle_, &msg_count)) {
101  if (msg->msg == CURLMSG_DONE) {
102  for (auto it = requests_.begin(); it != requests_.end(); it++) {
103  if ((*it)->curl_ == msg->easy_handle) {
104  (*it)->OnRequestComplete(msg->data.result); // NOLINT
105  requests_.erase(it);
106  break;
107  }
108  }
109  CHECK_EQ(curl_multi_remove_handle(multi_handle_, msg->easy_handle),
110  CURLM_OK);
111  } else {
112  // There are currently no other message types.
113  LOG(DFATAL) << "Unknown message type: " << msg->msg;
114  }
115  }
116 
117  if (curl_multi_fdset(multi_handle_, &fdread, &fdwrite, &fdexc, &maxfd) !=
118  CURLM_OK) {
119  LOG(ERROR) << "Error getting file descriptors from CURL";
120  }
121  if (curl_multi_timeout(multi_handle_, &timeout_ms) != CURLM_OK) {
122  LOG(ERROR) << "Error getting timeout from CURL";
123  }
124  if (timeout_ms < 0)
125  // If we failed to query CURL, use a default.
126  timeout_ms = kSmallDelayMs;
127  else
128  timeout_ms = std::min(timeout_ms, kMaxDelayMs);
129  }
130 
131  // Wait until we have something to do.
132  if (no_handles) {
133  std::unique_lock<Mutex> lock(mutex_);
134  cond_.ResetAndWaitWhileUnlocked(lock);
135  } else {
136  timeval timeout = {.tv_sec = timeout_ms / 1000,
137  .tv_usec = (timeout_ms % 1000) * 1000};
138  if (select(maxfd + 1, &fdread, &fdwrite, &fdexc, &timeout) < 0) {
139  if (errno == EBADF) {
140  // If another thread aborts the request, it will close the file
141  // descriptor, causing an error here, so just ignore it.
142  } else {
143  PLOG(ERROR) << "Error waiting for network handles";
144  }
145  }
146  }
147  }
148 }
149 
150 } // namespace shaka
bool ContainsRequest(RefPtr< js::XMLHttpRequest > request) const
bool contains(const std::vector< T > &vec, U &&elem)
Definition: utils.h:87
void join()
Definition: thread.h:56
void AbortRequest(RefPtr< js::XMLHttpRequest > request)
T ResetAndWaitWhileUnlocked(std::unique_lock< _Mutex > &lock)
Definition: thread_event.h:106
void AddRequest(RefPtr< js::XMLHttpRequest > request)
bool joinable() const
Definition: thread.h:37