1
0

stream.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. /*++
  2. Copyright (c) 2013 Minoca Corp. All Rights Reserved
  3. Module Name:
  4. stream.c
  5. Abstract:
  6. This module implements support for I/O streams.
  7. Author:
  8. Evan Green 15-Feb-2013
  9. Environment:
  10. Kernel
  11. --*/
  12. //
  13. // ------------------------------------------------------------------- Includes
  14. //
  15. #include <minoca/kernel/kernel.h>
  16. #include "iop.h"
  17. //
  18. // ---------------------------------------------------------------- Definitions
  19. //
  20. #define DEFAULT_STREAM_BUFFER_SIZE 8192
  21. //
  22. // ------------------------------------------------------ Data Type Definitions
  23. //
  24. /*++
  25. Structure Description:
  26. This structure describes characteristics about a data stream buffer.
  27. Members:
  28. Flags - Stores a bitfield of flags governing the state of the stream buffer.
  29. See STREAM_BUFFER_FLAG_* definitions.
  30. Size - Stores the size of the buffer, in bytes.
  31. Buffer - Stores a pointer to the actual stream buffer.
  32. NextReadOffset - Stores the offset from the beginning of the buffer where
  33. the next read should occur (points to the first unread byte).
  34. NextWriteOffset - Stores the offset from the beginning of the buffer where
  35. the next write should occur (points to the first unused offset).
  36. AtomicWriteSize - Stores the number of bytes that can always be written
  37. to the stream atomically (without interleaving).
  38. Lock - Stores a pointer to a lock ensuring only one party is accessing the
  39. buffer at once.
  40. IoState - Stores a pointer to the I/O object state.
  41. --*/
  42. struct _STREAM_BUFFER {
  43. ULONG Flags;
  44. ULONG Size;
  45. PVOID Buffer;
  46. ULONG NextReadOffset;
  47. ULONG NextWriteOffset;
  48. ULONG AtomicWriteSize;
  49. PQUEUED_LOCK Lock;
  50. PIO_OBJECT_STATE IoState;
  51. };
  52. //
  53. // ----------------------------------------------- Internal Function Prototypes
  54. //
  55. //
  56. // -------------------------------------------------------------------- Globals
  57. //
  58. //
  59. // ------------------------------------------------------------------ Functions
  60. //
  61. PSTREAM_BUFFER
  62. IoCreateStreamBuffer (
  63. PIO_OBJECT_STATE IoState,
  64. ULONG Flags,
  65. ULONG BufferSize,
  66. ULONG AtomicWriteSize
  67. )
  68. /*++
  69. Routine Description:
  70. This routine allocates and initializes a new stream buffer.
  71. Arguments:
  72. IoState - Supplies an optional pointer to the I/O state structure to use
  73. for this stream buffer.
  74. Flags - Supplies a bitfield of flags governing the behavior of the stream
  75. buffer. See STREAM_BUFFER_FLAG_* definitions.
  76. BufferSize - Supplies the size of the buffer. Supply zero to use a default
  77. system value.
  78. AtomicWriteSize - Supplies the number of bytes that can always be written
  79. to the stream atomically (without interleaving).
  80. Return Value:
  81. Returns a pointer to the buffer on success.
  82. NULL on invalid parameter or allocation failure.
  83. --*/
  84. {
  85. KSTATUS Status;
  86. PSTREAM_BUFFER StreamBuffer;
  87. if (AtomicWriteSize == 0) {
  88. AtomicWriteSize = 1;
  89. }
  90. if (BufferSize == 0) {
  91. BufferSize = DEFAULT_STREAM_BUFFER_SIZE;
  92. //
  93. // Bump up the internal buffer size since one byte of the buffer is always
  94. // wasted.
  95. //
  96. } else {
  97. BufferSize += 1;
  98. }
  99. if (BufferSize < AtomicWriteSize) {
  100. BufferSize = AtomicWriteSize + 1;
  101. }
  102. //
  103. // Create the stream buffer structure.
  104. //
  105. StreamBuffer = MmAllocatePagedPool(sizeof(STREAM_BUFFER),
  106. FI_ALLOCATION_TAG);
  107. if (StreamBuffer == NULL) {
  108. Status = STATUS_INSUFFICIENT_RESOURCES;
  109. goto CreateStreamBufferEnd;
  110. }
  111. RtlZeroMemory(StreamBuffer, sizeof(STREAM_BUFFER));
  112. StreamBuffer->Size = BufferSize;
  113. StreamBuffer->AtomicWriteSize = AtomicWriteSize;
  114. StreamBuffer->Lock = KeCreateQueuedLock();
  115. if (StreamBuffer->Lock == NULL) {
  116. Status = STATUS_INSUFFICIENT_RESOURCES;
  117. goto CreateStreamBufferEnd;
  118. }
  119. //
  120. // Create the buffer itself.
  121. //
  122. StreamBuffer->Buffer = MmAllocatePagedPool(BufferSize,
  123. FI_ALLOCATION_TAG);
  124. if (StreamBuffer->Buffer == NULL) {
  125. Status = STATUS_INSUFFICIENT_RESOURCES;
  126. goto CreateStreamBufferEnd;
  127. }
  128. //
  129. // Use the given I/O object state or create one.
  130. //
  131. ASSERT(IoState != NULL);
  132. StreamBuffer->IoState = IoState;
  133. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, TRUE);
  134. StreamBuffer->Flags = Flags;
  135. Status = STATUS_SUCCESS;
  136. CreateStreamBufferEnd:
  137. if (!KSUCCESS(Status)) {
  138. if (StreamBuffer != NULL) {
  139. if (StreamBuffer->Lock != NULL) {
  140. KeDestroyQueuedLock(StreamBuffer->Lock);
  141. }
  142. if (StreamBuffer->Buffer != NULL) {
  143. MmFreePagedPool(StreamBuffer->Buffer);
  144. }
  145. MmFreePagedPool(StreamBuffer);
  146. StreamBuffer = NULL;
  147. }
  148. }
  149. return StreamBuffer;
  150. }
  151. VOID
  152. IoDestroyStreamBuffer (
  153. PSTREAM_BUFFER StreamBuffer
  154. )
  155. /*++
  156. Routine Description:
  157. This routine destroys an allocated stream buffer. It assumes there are no
  158. waiters on the events.
  159. Arguments:
  160. StreamBuffer - Supplies a pointer to the stream buffer to tear down.
  161. Return Value:
  162. None.
  163. --*/
  164. {
  165. if (StreamBuffer->Lock != NULL) {
  166. KeDestroyQueuedLock(StreamBuffer->Lock);
  167. }
  168. StreamBuffer->IoState = NULL;
  169. if (StreamBuffer->Buffer != NULL) {
  170. MmFreePagedPool(StreamBuffer->Buffer);
  171. StreamBuffer->Buffer = NULL;
  172. }
  173. MmFreePagedPool(StreamBuffer);
  174. return;
  175. }
  176. KSTATUS
  177. IoReadStreamBuffer (
  178. PSTREAM_BUFFER StreamBuffer,
  179. PIO_BUFFER IoBuffer,
  180. UINTN ByteCount,
  181. ULONG TimeoutInMilliseconds,
  182. BOOL NonBlocking,
  183. PUINTN BytesRead
  184. )
  185. /*++
  186. Routine Description:
  187. This routine reads from a stream buffer. This routine must be called at low
  188. level, unless the stream was set up to be read at dispatch.
  189. Arguments:
  190. StreamBuffer - Supplies a pointer to the stream buffer to read from.
  191. IoBuffer - Supplies a pointer to the I/O buffer where the read data will be
  192. returned on success.
  193. ByteCount - Supplies the number of bytes to read.
  194. TimeoutInMilliseconds - Supplies the number of milliseconds that the I/O
  195. operation should be waited on before timing out. Use
  196. WAIT_TIME_INDEFINITE to wait forever on the I/O.
  197. NonBlocking - Supplies a boolean indicating if this read should avoid
  198. blocking.
  199. BytesRead - Supplies a pointer where the number of bytes actually read will
  200. be returned.
  201. Return Value:
  202. Status code. If a failing status code is returned, then check the number of
  203. bytes read to see if any valid data was returned.
  204. --*/
  205. {
  206. ULONG BytesAvailable;
  207. ULONG BytesReadHere;
  208. ULONG BytesToRead;
  209. ULONG EventsMask;
  210. ULONG NextWriteOffset;
  211. ULONG ReturnedEvents;
  212. KSTATUS Status;
  213. *BytesRead = 0;
  214. BytesReadHere = 0;
  215. EventsMask = POLL_EVENT_IN | POLL_ERROR_EVENTS;
  216. ASSERT(KeGetRunLevel() == RunLevelLow);
  217. //
  218. // Loop until at least a byte was read.
  219. //
  220. Status = STATUS_SUCCESS;
  221. while (BytesReadHere == 0) {
  222. //
  223. // Unless in non-blocking mode, wait for either the read or error
  224. // events to be set.
  225. //
  226. if (NonBlocking == FALSE) {
  227. Status = IoWaitForIoObjectState(StreamBuffer->IoState,
  228. EventsMask,
  229. TRUE,
  230. TimeoutInMilliseconds,
  231. &ReturnedEvents);
  232. if (!KSUCCESS(Status)) {
  233. break;
  234. }
  235. } else {
  236. ReturnedEvents = StreamBuffer->IoState->Events & EventsMask;
  237. }
  238. //
  239. // Multiple threads might have come out of waiting. Acquire the lock.
  240. //
  241. KeAcquireQueuedLock(StreamBuffer->Lock);
  242. //
  243. // Start over if there's nothing to read.
  244. //
  245. if (StreamBuffer->NextReadOffset == StreamBuffer->NextWriteOffset) {
  246. //
  247. // If the IN flag is set, then that would mean this routine is
  248. // busy spinning. Poor form.
  249. //
  250. ASSERT((NonBlocking != FALSE) ||
  251. ((StreamBuffer->IoState->Events & POLL_ERROR_EVENTS) != 0) ||
  252. ((StreamBuffer->IoState->Events & POLL_EVENT_IN) == 0));
  253. KeReleaseQueuedLock(StreamBuffer->Lock);
  254. //
  255. // If the error event is set, error out.
  256. //
  257. if ((ReturnedEvents & POLL_ERROR_EVENTS) != 0) {
  258. Status = STATUS_END_OF_FILE;
  259. break;
  260. }
  261. //
  262. // Blocking reads loop back to wait on the event, non-blocking
  263. // reads exit now.
  264. //
  265. if (NonBlocking == FALSE) {
  266. continue;
  267. } else {
  268. if (*BytesRead == 0) {
  269. Status = STATUS_NO_DATA_AVAILABLE;
  270. }
  271. break;
  272. }
  273. }
  274. //
  275. // Now read the buffer, at least going to the end of the buffer.
  276. // Wraparounds will be handled later on.
  277. //
  278. NextWriteOffset = StreamBuffer->NextWriteOffset;
  279. ASSERT(NextWriteOffset < StreamBuffer->Size);
  280. if (NextWriteOffset > StreamBuffer->NextReadOffset) {
  281. BytesAvailable = NextWriteOffset - StreamBuffer->NextReadOffset;
  282. } else {
  283. BytesAvailable = StreamBuffer->Size - StreamBuffer->NextReadOffset;
  284. }
  285. BytesToRead = BytesAvailable;
  286. if (ByteCount < BytesToRead) {
  287. BytesToRead = ByteCount;
  288. }
  289. Status = MmCopyIoBufferData(
  290. IoBuffer,
  291. StreamBuffer->Buffer + StreamBuffer->NextReadOffset,
  292. *BytesRead,
  293. BytesToRead,
  294. TRUE);
  295. if (!KSUCCESS(Status)) {
  296. KeReleaseQueuedLock(StreamBuffer->Lock);
  297. return Status;
  298. }
  299. //
  300. // Update the read offset so that it always contains a valid value.
  301. //
  302. if (StreamBuffer->NextReadOffset + BytesToRead == StreamBuffer->Size) {
  303. StreamBuffer->NextReadOffset = 0;
  304. } else {
  305. StreamBuffer->NextReadOffset += BytesToRead;
  306. }
  307. ASSERT(StreamBuffer->NextReadOffset < StreamBuffer->Size);
  308. *BytesRead += BytesToRead;
  309. BytesReadHere += BytesToRead;
  310. ByteCount -= BytesToRead;
  311. //
  312. // The first copy is done, but it's possible that the eligible read
  313. // content wraps around. Grab the rest of that data if so.
  314. //
  315. if ((ByteCount != 0) &&
  316. (StreamBuffer->NextReadOffset != NextWriteOffset)) {
  317. ASSERT(StreamBuffer->NextReadOffset == 0);
  318. ASSERT(NextWriteOffset > StreamBuffer->NextReadOffset);
  319. BytesAvailable = NextWriteOffset - StreamBuffer->NextReadOffset;
  320. BytesToRead = BytesAvailable;
  321. if (ByteCount < BytesToRead) {
  322. BytesToRead = ByteCount;
  323. }
  324. //
  325. // Don't break out of the loop on failure right away, as the
  326. // I/O state events need to be adjusted for the successful first
  327. // copy that happened.
  328. //
  329. Status = MmCopyIoBufferData(
  330. IoBuffer,
  331. StreamBuffer->Buffer + StreamBuffer->NextReadOffset,
  332. *BytesRead,
  333. BytesToRead,
  334. TRUE);
  335. if (KSUCCESS(Status)) {
  336. StreamBuffer->NextReadOffset += BytesToRead;
  337. ASSERT(StreamBuffer->NextReadOffset < StreamBuffer->Size);
  338. *BytesRead += BytesToRead;
  339. BytesReadHere += BytesToRead;
  340. ByteCount -= BytesToRead;
  341. }
  342. }
  343. //
  344. // Signal the write event (since more space was just made), and signal
  345. // the read event if there is still data left to be read. Don't do
  346. // this if the error events are set, as this is probably a disconnected
  347. // pipe with some data left in it.
  348. //
  349. if ((ReturnedEvents & POLL_ERROR_EVENTS) == 0) {
  350. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, TRUE);
  351. if (StreamBuffer->NextReadOffset != StreamBuffer->NextWriteOffset) {
  352. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_IN, TRUE);
  353. } else {
  354. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_IN, FALSE);
  355. }
  356. }
  357. KeReleaseQueuedLock(StreamBuffer->Lock);
  358. //
  359. // If that second copy failed, now's the time to break out.
  360. //
  361. if (!KSUCCESS(Status)) {
  362. return Status;
  363. }
  364. }
  365. return Status;
  366. }
  367. KSTATUS
  368. IoWriteStreamBuffer (
  369. PSTREAM_BUFFER StreamBuffer,
  370. PIO_BUFFER IoBuffer,
  371. UINTN ByteCount,
  372. ULONG TimeoutInMilliseconds,
  373. BOOL NonBlocking,
  374. PUINTN BytesWritten
  375. )
  376. /*++
  377. Routine Description:
  378. This routine writes to a stream buffer. This routine must be called at low
  379. level, unless the stream was set up to be written at dispatch.
  380. Arguments:
  381. StreamBuffer - Supplies a pointer to the stream buffer to write to.
  382. IoBuffer - Supplies a pointer to the I/O buffer containing the data to
  383. write to the stream buffer.
  384. ByteCount - Supplies the number of bytes to writes.
  385. TimeoutInMilliseconds - Supplies the number of milliseconds that the I/O
  386. operation should be waited on before timing out. Use
  387. WAIT_TIME_INDEFINITE to wait forever on the I/O.
  388. NonBlocking - Supplies a boolean indicating if this write should avoid
  389. blocking.
  390. BytesWritten - Supplies a pointer where the number of bytes actually written
  391. will be returned.
  392. Return Value:
  393. Status code. If a failing status code is returned, then check the number of
  394. bytes read to see if any valid data was written.
  395. --*/
  396. {
  397. ULONG BytesAvailable;
  398. ULONG BytesToWrite;
  399. ULONG EventsMask;
  400. ULONG NextReadOffset;
  401. ULONG ReturnedEvents;
  402. KSTATUS Status;
  403. ULONG TotalBytesAvailable;
  404. *BytesWritten = 0;
  405. EventsMask = POLL_EVENT_OUT | POLL_ERROR_EVENTS;
  406. ASSERT(KeGetRunLevel() == RunLevelLow);
  407. Status = STATUS_SUCCESS;
  408. while (ByteCount != 0) {
  409. if (NonBlocking == FALSE) {
  410. Status = IoWaitForIoObjectState(StreamBuffer->IoState,
  411. EventsMask,
  412. TRUE,
  413. TimeoutInMilliseconds,
  414. &ReturnedEvents);
  415. if (!KSUCCESS(Status)) {
  416. break;
  417. }
  418. if (ReturnedEvents != POLL_EVENT_OUT) {
  419. Status = STATUS_BROKEN_PIPE;
  420. break;
  421. }
  422. }
  423. //
  424. // Multiple threads might have come out of waiting since read and
  425. // write aren't synchronized.
  426. //
  427. KeAcquireQueuedLock(StreamBuffer->Lock);
  428. //
  429. // Figure out how much room there is.
  430. //
  431. NextReadOffset = StreamBuffer->NextReadOffset;
  432. ASSERT(NextReadOffset < StreamBuffer->Size);
  433. if (NextReadOffset <= StreamBuffer->NextWriteOffset) {
  434. //
  435. // The total available is the entire buffer (minus one) minus the
  436. // distance between the read and write pointers.
  437. //
  438. TotalBytesAvailable = (StreamBuffer->Size - 1) -
  439. (StreamBuffer->NextWriteOffset -
  440. NextReadOffset);
  441. //
  442. // The first copy goes from the next write offset to the end, but
  443. // if the read offset is right at zero then the padding byte is at
  444. // the end there.
  445. //
  446. BytesAvailable = StreamBuffer->Size - StreamBuffer->NextWriteOffset;
  447. if (NextReadOffset == 0) {
  448. BytesAvailable -= 1;
  449. }
  450. } else {
  451. //
  452. // The total available space is the distance between the write
  453. // catching up to the read, minus the one buffer byte.
  454. //
  455. BytesAvailable = NextReadOffset - StreamBuffer->NextWriteOffset - 1;
  456. TotalBytesAvailable = BytesAvailable;
  457. }
  458. //
  459. // Start over if the buffer is full. The stream stipulates that it will
  460. // always be able to write at least the atomic size without
  461. // interleaving.
  462. //
  463. if ((TotalBytesAvailable < ByteCount) &&
  464. (TotalBytesAvailable < StreamBuffer->AtomicWriteSize)) {
  465. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, FALSE);
  466. KeReleaseQueuedLock(StreamBuffer->Lock);
  467. if (NonBlocking == FALSE) {
  468. continue;
  469. } else {
  470. if (*BytesWritten == 0) {
  471. Status = STATUS_TRY_AGAIN;
  472. }
  473. break;
  474. }
  475. }
  476. //
  477. // Now write to the buffer, at least going to the end of the buffer.
  478. // Wraparounds will be handled later on.
  479. //
  480. ASSERT(BytesAvailable != 0);
  481. BytesToWrite = BytesAvailable;
  482. if (ByteCount < BytesToWrite) {
  483. BytesToWrite = ByteCount;
  484. }
  485. Status = MmCopyIoBufferData(
  486. IoBuffer,
  487. StreamBuffer->Buffer + StreamBuffer->NextWriteOffset,
  488. *BytesWritten,
  489. BytesToWrite,
  490. FALSE);
  491. if (!KSUCCESS(Status)) {
  492. KeReleaseQueuedLock(StreamBuffer->Lock);
  493. return Status;
  494. }
  495. //
  496. // Update the next write pointer in a manner that ensures its value is
  497. // always valid.
  498. //
  499. if (StreamBuffer->NextWriteOffset + BytesToWrite ==
  500. StreamBuffer->Size) {
  501. StreamBuffer->NextWriteOffset = 0;
  502. } else {
  503. StreamBuffer->NextWriteOffset += BytesToWrite;
  504. }
  505. *BytesWritten += BytesToWrite;
  506. ByteCount -= BytesToWrite;
  507. TotalBytesAvailable -= BytesToWrite;
  508. //
  509. // The first copy is done, but it's possible that the eligible space
  510. // wraps around. Write the remainder if so.
  511. //
  512. if ((ByteCount != 0) &&
  513. (((StreamBuffer->NextWriteOffset + 1) % StreamBuffer->Size) !=
  514. NextReadOffset)) {
  515. ASSERT(StreamBuffer->NextWriteOffset == 0);
  516. ASSERT(NextReadOffset > StreamBuffer->NextWriteOffset + 1);
  517. BytesAvailable = NextReadOffset - StreamBuffer->NextWriteOffset - 1;
  518. BytesToWrite = BytesAvailable;
  519. if (ByteCount < BytesToWrite) {
  520. BytesToWrite = ByteCount;
  521. }
  522. //
  523. // Don't break out of the loop on failure right away, as the
  524. // I/O state events need to be adjusted for the successful first
  525. // copy that happened.
  526. //
  527. Status = MmCopyIoBufferData(
  528. IoBuffer,
  529. StreamBuffer->Buffer + StreamBuffer->NextWriteOffset,
  530. *BytesWritten,
  531. BytesToWrite,
  532. FALSE);
  533. if (KSUCCESS(Status)) {
  534. StreamBuffer->NextWriteOffset += BytesToWrite;
  535. ASSERT(StreamBuffer->NextWriteOffset < StreamBuffer->Size);
  536. *BytesWritten += BytesToWrite;
  537. ByteCount -= BytesToWrite;
  538. TotalBytesAvailable -= BytesToWrite;
  539. }
  540. }
  541. //
  542. // Signal the read event (since there's now stuff to read), and signal
  543. // the write event if there is still space left.
  544. //
  545. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_IN, TRUE);
  546. ASSERT(TotalBytesAvailable < StreamBuffer->Size);
  547. if (TotalBytesAvailable >= StreamBuffer->AtomicWriteSize) {
  548. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, TRUE);
  549. } else {
  550. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, FALSE);
  551. }
  552. KeReleaseQueuedLock(StreamBuffer->Lock);
  553. //
  554. // If that second copy failed, now is the time to exit.
  555. //
  556. if (!KSUCCESS(Status)) {
  557. return Status;
  558. }
  559. }
  560. return Status;
  561. }
  562. KSTATUS
  563. IoStreamBufferConnect (
  564. PSTREAM_BUFFER StreamBuffer
  565. )
  566. /*++
  567. Routine Description:
  568. This routine resets the I/O object state when someone connects to a stream
  569. buffer.
  570. Arguments:
  571. StreamBuffer - Supplies a pointer to the stream buffer.
  572. Return Value:
  573. Status code.
  574. --*/
  575. {
  576. ULONG TotalBytesAvailable;
  577. KeAcquireQueuedLock(StreamBuffer->Lock);
  578. //
  579. // Figure out how much space there is.
  580. //
  581. if (StreamBuffer->NextReadOffset <= StreamBuffer->NextWriteOffset) {
  582. //
  583. // The total available is the entire buffer (minus one) minus the
  584. // distance between the read and write pointers.
  585. //
  586. TotalBytesAvailable = (StreamBuffer->Size - 1) -
  587. (StreamBuffer->NextWriteOffset -
  588. StreamBuffer->NextReadOffset);
  589. } else {
  590. //
  591. // The total available space is the distance between the write
  592. // catching up to the read, minus the one buffer byte.
  593. //
  594. TotalBytesAvailable = StreamBuffer->NextReadOffset -
  595. StreamBuffer->NextWriteOffset - 1;
  596. }
  597. //
  598. // Signal the write event if there's space to be written.
  599. //
  600. if (TotalBytesAvailable >= StreamBuffer->AtomicWriteSize) {
  601. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, TRUE);
  602. } else {
  603. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_OUT, FALSE);
  604. }
  605. //
  606. // Signal the read event if there's data in there.
  607. //
  608. if (TotalBytesAvailable != StreamBuffer->Size - 1) {
  609. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_IN, TRUE);
  610. } else {
  611. IoSetIoObjectState(StreamBuffer->IoState, POLL_EVENT_IN, FALSE);
  612. }
  613. KeReleaseQueuedLock(StreamBuffer->Lock);
  614. return STATUS_SUCCESS;
  615. }
  616. PIO_OBJECT_STATE
  617. IoStreamBufferGetIoObjectState (
  618. PSTREAM_BUFFER StreamBuffer
  619. )
  620. /*++
  621. Routine Description:
  622. This routine returns the I/O state for a stream buffer.
  623. Arguments:
  624. StreamBuffer - Supplies a pointer to the stream buffer.
  625. Return Value:
  626. Returns a pointer to the stream buffer's I/O object state.
  627. --*/
  628. {
  629. return StreamBuffer->IoState;
  630. }
  631. //
  632. // --------------------------------------------------------- Internal Functions
  633. //