1
0

SeekableHttpStream.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. <?php
  2. /**
  3. * @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de)
  4. *
  5. * @author Lukas Stabe <lukas@stabe.de>
  6. * @author Robin Appelman <robin@icewind.nl>
  7. *
  8. * @license GNU AGPL version 3 or any later version
  9. *
  10. * This program is free software: you can redistribute it and/or modify
  11. * it under the terms of the GNU Affero General Public License as
  12. * published by the Free Software Foundation, either version 3 of the
  13. * License, or (at your option) any later version.
  14. *
  15. * This program is distributed in the hope that it will be useful,
  16. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. * GNU Affero General Public License for more details.
  19. *
  20. * You should have received a copy of the GNU Affero General Public License
  21. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  22. *
  23. */
  24. namespace OC\Files\Stream;
  25. use Icewind\Streams\File;
  26. use Icewind\Streams\Wrapper;
  27. /**
  28. * A stream wrapper that uses http range requests to provide a seekable stream for http reading
  29. */
  30. class SeekableHttpStream implements File {
  31. private const PROTOCOL = 'httpseek';
  32. private static bool $registered = false;
  33. /**
  34. * Registers the stream wrapper using the `httpseek://` url scheme
  35. * $return void
  36. */
  37. private static function registerIfNeeded() {
  38. if (!self::$registered) {
  39. stream_wrapper_register(
  40. self::PROTOCOL,
  41. self::class
  42. );
  43. self::$registered = true;
  44. }
  45. }
  46. /**
  47. * Open a readonly-seekable http stream
  48. *
  49. * The provided callback will be called with byte range and should return an http stream for the requested range
  50. *
  51. * @param callable $callback
  52. * @return false|resource
  53. */
  54. public static function open(callable $callback) {
  55. $context = stream_context_create([
  56. SeekableHttpStream::PROTOCOL => [
  57. 'callback' => $callback
  58. ],
  59. ]);
  60. SeekableHttpStream::registerIfNeeded();
  61. return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
  62. }
  63. /** @var resource */
  64. public $context;
  65. /** @var callable */
  66. private $openCallback;
  67. /** @var ?resource|closed-resource */
  68. private $current;
  69. /** @var int $offset offset of the current chunk */
  70. private int $offset = 0;
  71. /** @var int $length length of the current chunk */
  72. private int $length = 0;
  73. /** @var int $totalSize size of the full stream */
  74. private int $totalSize = 0;
  75. private bool $needReconnect = false;
  76. private function reconnect(int $start): bool {
  77. $this->needReconnect = false;
  78. $range = $start . '-';
  79. if ($this->hasOpenStream()) {
  80. fclose($this->current);
  81. }
  82. $stream = ($this->openCallback)($range);
  83. if ($stream === false) {
  84. $this->current = null;
  85. return false;
  86. }
  87. $this->current = $stream;
  88. $responseHead = stream_get_meta_data($this->current)['wrapper_data'];
  89. while ($responseHead instanceof Wrapper) {
  90. $wrapperOptions = stream_context_get_options($responseHead->context);
  91. foreach ($wrapperOptions as $options) {
  92. if (isset($options['source']) && is_resource($options['source'])) {
  93. $responseHead = stream_get_meta_data($options['source'])['wrapper_data'];
  94. continue 2;
  95. }
  96. }
  97. throw new \Exception("Failed to get source stream from stream wrapper of " . get_class($responseHead));
  98. }
  99. $rangeHeaders = array_values(array_filter($responseHead, function ($v) {
  100. return preg_match('#^content-range:#i', $v) === 1;
  101. }));
  102. if (!$rangeHeaders) {
  103. $this->current = null;
  104. return false;
  105. }
  106. $contentRange = $rangeHeaders[0];
  107. $content = trim(explode(':', $contentRange)[1]);
  108. $range = trim(explode(' ', $content)[1]);
  109. $begin = intval(explode('-', $range)[0]);
  110. $length = intval(explode('/', $range)[1]);
  111. if ($begin !== $start) {
  112. $this->current = null;
  113. return false;
  114. }
  115. $this->offset = $begin;
  116. $this->length = $length;
  117. if ($start === 0) {
  118. $this->totalSize = $length;
  119. }
  120. return true;
  121. }
  122. /**
  123. * @return ?resource
  124. */
  125. private function getCurrent() {
  126. if ($this->needReconnect) {
  127. $this->reconnect($this->offset);
  128. }
  129. if (is_resource($this->current)) {
  130. return $this->current;
  131. } else {
  132. return null;
  133. }
  134. }
  135. /**
  136. * @return bool
  137. * @psalm-assert-if-true resource $this->current
  138. */
  139. private function hasOpenStream(): bool {
  140. return is_resource($this->current);
  141. }
  142. public function stream_open($path, $mode, $options, &$opened_path) {
  143. $options = stream_context_get_options($this->context)[self::PROTOCOL];
  144. $this->openCallback = $options['callback'];
  145. return $this->reconnect(0);
  146. }
  147. public function stream_read($count) {
  148. if (!$this->getCurrent()) {
  149. return false;
  150. }
  151. $ret = fread($this->getCurrent(), $count);
  152. $this->offset += strlen($ret);
  153. return $ret;
  154. }
  155. public function stream_seek($offset, $whence = SEEK_SET) {
  156. switch ($whence) {
  157. case SEEK_SET:
  158. if ($offset === $this->offset) {
  159. return true;
  160. } else {
  161. $this->offset = $offset;
  162. }
  163. break;
  164. case SEEK_CUR:
  165. if ($offset === 0) {
  166. return true;
  167. } else {
  168. $this->offset += $offset;
  169. }
  170. break;
  171. case SEEK_END:
  172. if ($this->length === 0) {
  173. return false;
  174. } elseif ($this->length + $offset === $this->offset) {
  175. return true;
  176. } else {
  177. $this->offset = $this->length + $offset;
  178. }
  179. break;
  180. }
  181. if ($this->hasOpenStream()) {
  182. fclose($this->current);
  183. }
  184. $this->current = null;
  185. $this->needReconnect = true;
  186. return true;
  187. }
  188. public function stream_tell() {
  189. return $this->offset;
  190. }
  191. public function stream_stat() {
  192. if ($this->getCurrent()) {
  193. $stat = fstat($this->getCurrent());
  194. if ($stat) {
  195. $stat['size'] = $this->totalSize;
  196. }
  197. return $stat;
  198. } else {
  199. return false;
  200. }
  201. }
  202. public function stream_eof() {
  203. if ($this->getCurrent()) {
  204. return feof($this->getCurrent());
  205. } else {
  206. return true;
  207. }
  208. }
  209. public function stream_close() {
  210. if ($this->hasOpenStream()) {
  211. fclose($this->current);
  212. }
  213. $this->current = null;
  214. }
  215. public function stream_write($data) {
  216. return false;
  217. }
  218. public function stream_set_option($option, $arg1, $arg2) {
  219. return false;
  220. }
  221. public function stream_truncate($size) {
  222. return false;
  223. }
  224. public function stream_lock($operation) {
  225. return false;
  226. }
  227. public function stream_flush() {
  228. return; //noop because readonly stream
  229. }
  230. }