SeekableHttpStream.php 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. <?php
  2. /**
  3. * SPDX-FileCopyrightText: 2020 Nextcloud GmbH and Nextcloud contributors
  4. * SPDX-License-Identifier: AGPL-3.0-or-later
  5. */
  6. namespace OC\Files\Stream;
  7. use Icewind\Streams\File;
  8. use Icewind\Streams\Wrapper;
  9. /**
  10. * A stream wrapper that uses http range requests to provide a seekable stream for http reading
  11. */
  12. class SeekableHttpStream implements File {
  13. private const PROTOCOL = 'httpseek';
  14. private static bool $registered = false;
  15. /**
  16. * Registers the stream wrapper using the `httpseek://` url scheme
  17. * $return void
  18. */
  19. private static function registerIfNeeded() {
  20. if (!self::$registered) {
  21. stream_wrapper_register(
  22. self::PROTOCOL,
  23. self::class
  24. );
  25. self::$registered = true;
  26. }
  27. }
  28. /**
  29. * Open a readonly-seekable http stream
  30. *
  31. * The provided callback will be called with byte range and should return an http stream for the requested range
  32. *
  33. * @param callable $callback
  34. * @return false|resource
  35. */
  36. public static function open(callable $callback) {
  37. $context = stream_context_create([
  38. SeekableHttpStream::PROTOCOL => [
  39. 'callback' => $callback
  40. ],
  41. ]);
  42. SeekableHttpStream::registerIfNeeded();
  43. return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
  44. }
  45. /** @var resource */
  46. public $context;
  47. /** @var callable */
  48. private $openCallback;
  49. /** @var ?resource|closed-resource */
  50. private $current;
  51. /** @var int $offset offset of the current chunk */
  52. private int $offset = 0;
  53. /** @var int $length length of the current chunk */
  54. private int $length = 0;
  55. /** @var int $totalSize size of the full stream */
  56. private int $totalSize = 0;
  57. private bool $needReconnect = false;
  58. private function reconnect(int $start): bool {
  59. $this->needReconnect = false;
  60. $range = $start . '-';
  61. if ($this->hasOpenStream()) {
  62. fclose($this->current);
  63. }
  64. $stream = ($this->openCallback)($range);
  65. if ($stream === false) {
  66. $this->current = null;
  67. return false;
  68. }
  69. $this->current = $stream;
  70. $responseHead = stream_get_meta_data($this->current)['wrapper_data'];
  71. while ($responseHead instanceof Wrapper) {
  72. $wrapperOptions = stream_context_get_options($responseHead->context);
  73. foreach ($wrapperOptions as $options) {
  74. if (isset($options['source']) && is_resource($options['source'])) {
  75. $responseHead = stream_get_meta_data($options['source'])['wrapper_data'];
  76. continue 2;
  77. }
  78. }
  79. throw new \Exception("Failed to get source stream from stream wrapper of " . get_class($responseHead));
  80. }
  81. $rangeHeaders = array_values(array_filter($responseHead, function ($v) {
  82. return preg_match('#^content-range:#i', $v) === 1;
  83. }));
  84. if (!$rangeHeaders) {
  85. $this->current = null;
  86. return false;
  87. }
  88. $contentRange = $rangeHeaders[0];
  89. $content = trim(explode(':', $contentRange)[1]);
  90. $range = trim(explode(' ', $content)[1]);
  91. $begin = intval(explode('-', $range)[0]);
  92. $length = intval(explode('/', $range)[1]);
  93. if ($begin !== $start) {
  94. $this->current = null;
  95. return false;
  96. }
  97. $this->offset = $begin;
  98. $this->length = $length;
  99. if ($start === 0) {
  100. $this->totalSize = $length;
  101. }
  102. return true;
  103. }
  104. /**
  105. * @return ?resource
  106. */
  107. private function getCurrent() {
  108. if ($this->needReconnect) {
  109. $this->reconnect($this->offset);
  110. }
  111. if (is_resource($this->current)) {
  112. return $this->current;
  113. } else {
  114. return null;
  115. }
  116. }
  117. /**
  118. * @return bool
  119. * @psalm-assert-if-true resource $this->current
  120. */
  121. private function hasOpenStream(): bool {
  122. return is_resource($this->current);
  123. }
  124. public function stream_open($path, $mode, $options, &$opened_path) {
  125. $options = stream_context_get_options($this->context)[self::PROTOCOL];
  126. $this->openCallback = $options['callback'];
  127. return $this->reconnect(0);
  128. }
  129. public function stream_read($count) {
  130. if (!$this->getCurrent()) {
  131. return false;
  132. }
  133. $ret = fread($this->getCurrent(), $count);
  134. $this->offset += strlen($ret);
  135. return $ret;
  136. }
  137. public function stream_seek($offset, $whence = SEEK_SET) {
  138. switch ($whence) {
  139. case SEEK_SET:
  140. if ($offset === $this->offset) {
  141. return true;
  142. } else {
  143. $this->offset = $offset;
  144. }
  145. break;
  146. case SEEK_CUR:
  147. if ($offset === 0) {
  148. return true;
  149. } else {
  150. $this->offset += $offset;
  151. }
  152. break;
  153. case SEEK_END:
  154. if ($this->length === 0) {
  155. return false;
  156. } elseif ($this->length + $offset === $this->offset) {
  157. return true;
  158. } else {
  159. $this->offset = $this->length + $offset;
  160. }
  161. break;
  162. }
  163. if ($this->hasOpenStream()) {
  164. fclose($this->current);
  165. }
  166. $this->current = null;
  167. $this->needReconnect = true;
  168. return true;
  169. }
  170. public function stream_tell() {
  171. return $this->offset;
  172. }
  173. public function stream_stat() {
  174. if ($this->getCurrent()) {
  175. $stat = fstat($this->getCurrent());
  176. if ($stat) {
  177. $stat['size'] = $this->totalSize;
  178. }
  179. return $stat;
  180. } else {
  181. return false;
  182. }
  183. }
  184. public function stream_eof() {
  185. if ($this->getCurrent()) {
  186. return feof($this->getCurrent());
  187. } else {
  188. return true;
  189. }
  190. }
  191. public function stream_close() {
  192. if ($this->hasOpenStream()) {
  193. fclose($this->current);
  194. }
  195. $this->current = null;
  196. }
  197. public function stream_write($data) {
  198. return false;
  199. }
  200. public function stream_set_option($option, $arg1, $arg2) {
  201. return false;
  202. }
  203. public function stream_truncate($size) {
  204. return false;
  205. }
  206. public function stream_lock($operation) {
  207. return false;
  208. }
  209. public function stream_flush() {
  210. return; //noop because readonly stream
  211. }
  212. }