FramingIface.c 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "interface/FramingIface.h"
  16. #include "interface/Iface.h"
  17. #include "memory/Allocator.h"
  18. #include "util/Identity.h"
  19. #include "wire/Error.h"
  20. #include "util/Assert.h"
  21. #define REQUIRED_PADDING 512
  22. struct MessageList;
  23. struct MessageList {
  24. Message_t* msg;
  25. struct MessageList* next;
  26. };
  27. struct FramingIface_pvt {
  28. struct Iface messageIf;
  29. struct Iface streamIf;
  30. const uint32_t maxMessageSize;
  31. struct Allocator* alloc;
  32. // fields specific to this frame.
  33. uint32_t bytesRemaining;
  34. struct Allocator* frameAlloc;
  35. struct MessageList* frameParts;
  36. union {
  37. uint32_t length_be;
  38. uint8_t bytes[4];
  39. } header;
  40. uint32_t headerIndex;
  41. Identity
  42. };
  43. static Message_t* mergeMessage(struct FramingIface_pvt* fi, Message_t* last)
  44. {
  45. int length = Message_getLength(last);
  46. // The only accurate way to get the full length because this last might contain
  47. // the beginning of the next frame.
  48. struct MessageList* part = fi->frameParts;
  49. for (part = fi->frameParts; part; part = part->next) {
  50. length += Message_getLength(part->msg);
  51. }
  52. Message_t* out = Message_new(0, length + REQUIRED_PADDING, fi->frameAlloc);
  53. Er_assert(Message_epush(out, Message_bytes(last), Message_getLength(last)));
  54. int fd = Message_getAssociatedFd(last);
  55. for (part = fi->frameParts; part; part = part->next) {
  56. Er_assert(Message_epush(out, Message_bytes(part->msg), Message_getLength(part->msg)));
  57. if (fd == -1) {
  58. fd = Message_getAssociatedFd(part->msg);
  59. }
  60. }
  61. if (fd > -1) {
  62. Message_setAssociatedFd(out, fd);
  63. }
  64. Assert_true(length <= Message_getLength(out));
  65. return out;
  66. }
  67. static Iface_DEFUN receiveMessage(Message_t* msg, struct Iface* streamIf)
  68. {
  69. struct FramingIface_pvt* fi = Identity_containerOf(streamIf, struct FramingIface_pvt, streamIf);
  70. if (fi->bytesRemaining > fi->maxMessageSize) {
  71. // Oversize message
  72. Assert_ifTesting(0);
  73. return Error(msg, "OVERSIZE_MESSAGE");
  74. }
  75. if (fi->frameParts) {
  76. if (fi->bytesRemaining <= (uint32_t)Message_getLength(msg)) {
  77. Message_t* wholeMessage = mergeMessage(fi, msg);
  78. fi->bytesRemaining = 0;
  79. fi->frameParts = NULL;
  80. Assert_true(fi->headerIndex == 0);
  81. struct Allocator* frameAlloc = fi->frameAlloc;
  82. fi->frameAlloc = NULL;
  83. // Run the message through again since it's almost certainly not perfect size.
  84. Iface_CALL(receiveMessage, wholeMessage, streamIf);
  85. Allocator_free(frameAlloc);
  86. return NULL;
  87. }
  88. fi->bytesRemaining -= Message_getLength(msg);
  89. Allocator_adopt(fi->frameAlloc, Message_getAlloc(msg));
  90. struct MessageList* parts = Allocator_calloc(fi->frameAlloc, sizeof(struct MessageList), 1);
  91. parts->msg = msg;
  92. parts->next = fi->frameParts;
  93. fi->frameParts = parts;
  94. return NULL;
  95. }
  96. for (;;) {
  97. while (fi->headerIndex < 4) {
  98. if (!Message_getLength(msg)) {
  99. return NULL;
  100. }
  101. fi->header.bytes[fi->headerIndex] = Message_bytes(msg)[0];
  102. Er_assert(Message_eshift(msg, -1));
  103. fi->headerIndex++;
  104. }
  105. fi->headerIndex = 0;
  106. fi->bytesRemaining = Endian_bigEndianToHost32(fi->header.length_be);
  107. if (fi->bytesRemaining > fi->maxMessageSize) {
  108. // oversize
  109. Assert_ifTesting(0);
  110. return Error(msg, "OVERSIZE_MESSAGE");
  111. }
  112. if (fi->bytesRemaining == (uint32_t)Message_getLength(msg)) {
  113. fi->bytesRemaining = 0;
  114. return Iface_next(&fi->messageIf, msg);
  115. } else if (fi->bytesRemaining < (uint32_t)Message_getLength(msg)) {
  116. struct Allocator* alloc = Allocator_child(Message_getAlloc(msg));
  117. Message_t* m = Message_new(fi->bytesRemaining, REQUIRED_PADDING, alloc);
  118. Message_setAssociatedFd(m, Message_getAssociatedFd(msg));
  119. Bits_memcpy(Message_bytes(m), Message_bytes(msg), fi->bytesRemaining);
  120. Er_assert(Message_eshift(msg, -fi->bytesRemaining));
  121. fi->bytesRemaining = 0;
  122. Iface_send(&fi->messageIf, m);
  123. Allocator_free(alloc);
  124. continue;
  125. } else {
  126. fi->frameAlloc = Allocator_child(fi->alloc);
  127. Message_t* m = Message_new(0, Message_getLength(msg) + 4, fi->frameAlloc);
  128. Message_setAssociatedFd(m, Message_getAssociatedFd(msg));
  129. Er_assert(Message_epush(m, Message_bytes(msg), Message_getLength(msg)));
  130. Er_assert(Message_epush(m, fi->header.bytes, 4));
  131. fi->bytesRemaining -= Message_getLength(msg);
  132. fi->frameParts = Allocator_calloc(fi->frameAlloc, sizeof(struct MessageList), 1);
  133. fi->frameParts->msg = m;
  134. fi->frameParts->next = NULL;
  135. }
  136. return NULL;
  137. }
  138. }
  139. static Iface_DEFUN sendMessage(Message_t* msg, struct Iface* messageIf)
  140. {
  141. struct FramingIface_pvt* fi =
  142. Identity_containerOf(messageIf, struct FramingIface_pvt, messageIf);
  143. Er_assert(Message_epush32be(msg, Message_getLength(msg)));
  144. return Iface_next(&fi->streamIf, msg);
  145. }
  146. struct Iface* FramingIface_new(uint32_t maxMsgSize, struct Iface* toWrap, struct Allocator* alloc)
  147. {
  148. struct FramingIface_pvt* context =
  149. Allocator_clone(alloc, (&(struct FramingIface_pvt) {
  150. .maxMessageSize = maxMsgSize,
  151. .alloc = alloc,
  152. .streamIf = { .send = receiveMessage },
  153. .messageIf = { .send = sendMessage }
  154. }));
  155. Identity_set(context);
  156. Iface_plumb(toWrap, &context->streamIf);
  157. return &context->messageIf;
  158. }