http_pipe.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. #!/usr/bin/python
  2. # Copyright 2012 Google Inc. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # Modified by Linus Nielsen Feltzing for inclusion in the libcurl test
  17. # framework
  18. #
  19. import SocketServer
  20. import argparse
  21. import re
  22. import select
  23. import socket
  24. import time
  25. import pprint
  26. import os
  27. INFO_MESSAGE = '''
  28. This is a test server to test the libcurl pipelining functionality.
  29. It is a modified version if Google's HTTP pipelining test server. More
  30. information can be found here:
  31. http://dev.chromium.org/developers/design-documents/network-stack/http-pipelining
  32. Source code can be found here:
  33. http://code.google.com/p/http-pipelining-test/
  34. '''
  35. MAX_REQUEST_SIZE = 1024 # bytes
  36. MIN_POLL_TIME = 0.01 # seconds. Minimum time to poll, in order to prevent
  37. # excessive looping because Python refuses to poll for
  38. # small timeouts.
  39. SEND_BUFFER_TIME = 0.5 # seconds
  40. TIMEOUT = 30 # seconds
  41. class Error(Exception):
  42. pass
  43. class RequestTooLargeError(Error):
  44. pass
  45. class ServeIndexError(Error):
  46. pass
  47. class UnexpectedMethodError(Error):
  48. pass
  49. class RequestParser(object):
  50. """Parses an input buffer looking for HTTP GET requests."""
  51. global logfile
  52. LOOKING_FOR_GET = 1
  53. READING_HEADERS = 2
  54. HEADER_RE = re.compile('([^:]+):(.*)\n')
  55. REQUEST_RE = re.compile('([^ ]+) ([^ ]+) HTTP/(\d+)\.(\d+)\n')
  56. def __init__(self):
  57. """Initializer."""
  58. self._buffer = ""
  59. self._pending_headers = {}
  60. self._pending_request = ""
  61. self._state = self.LOOKING_FOR_GET
  62. self._were_all_requests_http_1_1 = True
  63. self._valid_requests = []
  64. def ParseAdditionalData(self, data):
  65. """Finds HTTP requests in |data|.
  66. Args:
  67. data: (String) Newly received input data from the socket.
  68. Returns:
  69. (List of Tuples)
  70. (String) The request path.
  71. (Map of String to String) The header name and value.
  72. Raises:
  73. RequestTooLargeError: If the request exceeds MAX_REQUEST_SIZE.
  74. UnexpectedMethodError: On a non-GET method.
  75. Error: On a programming error.
  76. """
  77. logfile = open('log/server.input', 'a')
  78. logfile.write(data)
  79. logfile.close()
  80. self._buffer += data.replace('\r', '')
  81. should_continue_parsing = True
  82. while should_continue_parsing:
  83. if self._state == self.LOOKING_FOR_GET:
  84. should_continue_parsing = self._DoLookForGet()
  85. elif self._state == self.READING_HEADERS:
  86. should_continue_parsing = self._DoReadHeader()
  87. else:
  88. raise Error('Unexpected state: ' + self._state)
  89. if len(self._buffer) > MAX_REQUEST_SIZE:
  90. raise RequestTooLargeError(
  91. 'Request is at least %d bytes' % len(self._buffer))
  92. valid_requests = self._valid_requests
  93. self._valid_requests = []
  94. return valid_requests
  95. @property
  96. def were_all_requests_http_1_1(self):
  97. return self._were_all_requests_http_1_1
  98. def _DoLookForGet(self):
  99. """Tries to parse an HTTTP request line.
  100. Returns:
  101. (Boolean) True if a request was found.
  102. Raises:
  103. UnexpectedMethodError: On a non-GET method.
  104. """
  105. m = self.REQUEST_RE.match(self._buffer)
  106. if not m:
  107. return False
  108. method, path, http_major, http_minor = m.groups()
  109. if method != 'GET':
  110. raise UnexpectedMethodError('Unexpected method: ' + method)
  111. if path in ['/', '/index.htm', '/index.html']:
  112. raise ServeIndexError()
  113. if http_major != '1' or http_minor != '1':
  114. self._were_all_requests_http_1_1 = False
  115. # print method, path
  116. self._pending_request = path
  117. self._buffer = self._buffer[m.end():]
  118. self._state = self.READING_HEADERS
  119. return True
  120. def _DoReadHeader(self):
  121. """Tries to parse a HTTP header.
  122. Returns:
  123. (Boolean) True if it found the end of the request or a HTTP header.
  124. """
  125. if self._buffer.startswith('\n'):
  126. self._buffer = self._buffer[1:]
  127. self._state = self.LOOKING_FOR_GET
  128. self._valid_requests.append((self._pending_request,
  129. self._pending_headers))
  130. self._pending_headers = {}
  131. self._pending_request = ""
  132. return True
  133. m = self.HEADER_RE.match(self._buffer)
  134. if not m:
  135. return False
  136. header = m.group(1).lower()
  137. value = m.group(2).strip().lower()
  138. if header not in self._pending_headers:
  139. self._pending_headers[header] = value
  140. self._buffer = self._buffer[m.end():]
  141. return True
  142. class ResponseBuilder(object):
  143. """Builds HTTP responses for a list of accumulated requests."""
  144. def __init__(self):
  145. """Initializer."""
  146. self._max_pipeline_depth = 0
  147. self._requested_paths = []
  148. self._processed_end = False
  149. self._were_all_requests_http_1_1 = True
  150. def QueueRequests(self, requested_paths, were_all_requests_http_1_1):
  151. """Adds requests to the queue of requests.
  152. Args:
  153. requested_paths: (List of Strings) Requested paths.
  154. """
  155. self._requested_paths.extend(requested_paths)
  156. self._were_all_requests_http_1_1 = were_all_requests_http_1_1
  157. def Chunkify(self, data, chunksize):
  158. """ Divides a string into chunks
  159. """
  160. return [hex(chunksize)[2:] + "\r\n" + data[i:i+chunksize] + "\r\n" for i in range(0, len(data), chunksize)]
  161. def BuildResponses(self):
  162. """Converts the queue of requests into responses.
  163. Returns:
  164. (String) Buffer containing all of the responses.
  165. """
  166. result = ""
  167. self._max_pipeline_depth = max(self._max_pipeline_depth,
  168. len(self._requested_paths))
  169. for path, headers in self._requested_paths:
  170. if path == '/verifiedserver':
  171. body = "WE ROOLZ: {}\r\n".format(os.getpid());
  172. result += self._BuildResponse(
  173. '200 OK', ['Server: Apache',
  174. 'Content-Length: {}'.format(len(body)),
  175. 'Cache-Control: no-store'], body)
  176. elif path == '/alphabet.txt':
  177. body = 'abcdefghijklmnopqrstuvwxyz'
  178. result += self._BuildResponse(
  179. '200 OK', ['Server: Apache',
  180. 'Content-Length: 26',
  181. 'Cache-Control: no-store'], body)
  182. elif path == '/reverse.txt':
  183. body = 'zyxwvutsrqponmlkjihgfedcba'
  184. result += self._BuildResponse(
  185. '200 OK', ['Content-Length: 26', 'Cache-Control: no-store'], body)
  186. elif path == '/chunked.txt':
  187. body = ('7\r\nchunked\r\n'
  188. '8\r\nencoding\r\n'
  189. '2\r\nis\r\n'
  190. '3\r\nfun\r\n'
  191. '0\r\n\r\n')
  192. result += self._BuildResponse(
  193. '200 OK', ['Transfer-Encoding: chunked', 'Cache-Control: no-store'],
  194. body)
  195. elif path == '/cached.txt':
  196. body = 'azbycxdwevfugthsirjqkplomn'
  197. result += self._BuildResponse(
  198. '200 OK', ['Content-Length: 26', 'Cache-Control: max-age=60'], body)
  199. elif path == '/connection_close.txt':
  200. body = 'azbycxdwevfugthsirjqkplomn'
  201. result += self._BuildResponse(
  202. '200 OK', ['Content-Length: 26', 'Cache-Control: max-age=60', 'Connection: close'], body)
  203. self._processed_end = True
  204. elif path == '/1k.txt':
  205. str = '0123456789abcdef'
  206. body = ''.join([str for num in xrange(64)])
  207. result += self._BuildResponse(
  208. '200 OK', ['Server: Apache',
  209. 'Content-Length: 1024',
  210. 'Cache-Control: max-age=60'], body)
  211. elif path == '/10k.txt':
  212. str = '0123456789abcdef'
  213. body = ''.join([str for num in xrange(640)])
  214. result += self._BuildResponse(
  215. '200 OK', ['Server: Apache',
  216. 'Content-Length: 10240',
  217. 'Cache-Control: max-age=60'], body)
  218. elif path == '/100k.txt':
  219. str = '0123456789abcdef'
  220. body = ''.join([str for num in xrange(6400)])
  221. result += self._BuildResponse(
  222. '200 OK',
  223. ['Server: Apache',
  224. 'Content-Length: 102400',
  225. 'Cache-Control: max-age=60'],
  226. body)
  227. elif path == '/100k_chunked.txt':
  228. str = '0123456789abcdef'
  229. moo = ''.join([str for num in xrange(6400)])
  230. body = self.Chunkify(moo, 20480)
  231. body.append('0\r\n\r\n')
  232. body = ''.join(body)
  233. result += self._BuildResponse(
  234. '200 OK', ['Transfer-Encoding: chunked', 'Cache-Control: no-store'], body)
  235. elif path == '/stats.txt':
  236. results = {
  237. 'max_pipeline_depth': self._max_pipeline_depth,
  238. 'were_all_requests_http_1_1': int(self._were_all_requests_http_1_1),
  239. }
  240. body = ','.join(['%s:%s' % (k, v) for k, v in results.items()])
  241. result += self._BuildResponse(
  242. '200 OK',
  243. ['Content-Length: %s' % len(body), 'Cache-Control: no-store'], body)
  244. self._processed_end = True
  245. else:
  246. result += self._BuildResponse('404 Not Found', ['Content-Length: 7'], 'Go away')
  247. if self._processed_end:
  248. break
  249. self._requested_paths = []
  250. return result
  251. def WriteError(self, status, error):
  252. """Returns an HTTP response for the specified error.
  253. Args:
  254. status: (String) Response code and descrtion (e.g. "404 Not Found")
  255. Returns:
  256. (String) Text of HTTP response.
  257. """
  258. return self._BuildResponse(
  259. status, ['Connection: close', 'Content-Type: text/plain'], error)
  260. @property
  261. def processed_end(self):
  262. return self._processed_end
  263. def _BuildResponse(self, status, headers, body):
  264. """Builds an HTTP response.
  265. Args:
  266. status: (String) Response code and descrtion (e.g. "200 OK")
  267. headers: (List of Strings) Headers (e.g. "Connection: close")
  268. body: (String) Response body.
  269. Returns:
  270. (String) Text of HTTP response.
  271. """
  272. return ('HTTP/1.1 %s\r\n'
  273. '%s\r\n'
  274. '\r\n'
  275. '%s' % (status, '\r\n'.join(headers), body))
  276. class PipelineRequestHandler(SocketServer.BaseRequestHandler):
  277. """Called on an incoming TCP connection."""
  278. def _GetTimeUntilTimeout(self):
  279. return self._start_time + TIMEOUT - time.time()
  280. def _GetTimeUntilNextSend(self):
  281. if not self._last_queued_time:
  282. return TIMEOUT
  283. return self._last_queued_time + SEND_BUFFER_TIME - time.time()
  284. def handle(self):
  285. self._request_parser = RequestParser()
  286. self._response_builder = ResponseBuilder()
  287. self._last_queued_time = 0
  288. self._num_queued = 0
  289. self._num_written = 0
  290. self._send_buffer = ""
  291. self._start_time = time.time()
  292. try:
  293. poller = select.epoll(sizehint=1)
  294. poller.register(self.request.fileno(), select.EPOLLIN)
  295. while not self._response_builder.processed_end or self._send_buffer:
  296. time_left = self._GetTimeUntilTimeout()
  297. time_until_next_send = self._GetTimeUntilNextSend()
  298. max_poll_time = min(time_left, time_until_next_send) + MIN_POLL_TIME
  299. events = None
  300. if max_poll_time > 0:
  301. if self._send_buffer:
  302. poller.modify(self.request.fileno(),
  303. select.EPOLLIN | select.EPOLLOUT)
  304. else:
  305. poller.modify(self.request.fileno(), select.EPOLLIN)
  306. events = poller.poll(timeout=max_poll_time)
  307. if self._GetTimeUntilTimeout() <= 0:
  308. return
  309. if self._GetTimeUntilNextSend() <= 0:
  310. self._send_buffer += self._response_builder.BuildResponses()
  311. self._num_written = self._num_queued
  312. self._last_queued_time = 0
  313. for fd, mode in events:
  314. if mode & select.EPOLLIN:
  315. new_data = self.request.recv(MAX_REQUEST_SIZE, socket.MSG_DONTWAIT)
  316. if not new_data:
  317. return
  318. new_requests = self._request_parser.ParseAdditionalData(new_data)
  319. self._response_builder.QueueRequests(
  320. new_requests, self._request_parser.were_all_requests_http_1_1)
  321. self._num_queued += len(new_requests)
  322. self._last_queued_time = time.time()
  323. elif mode & select.EPOLLOUT:
  324. num_bytes_sent = self.request.send(self._send_buffer[0:4096])
  325. self._send_buffer = self._send_buffer[num_bytes_sent:]
  326. time.sleep(0.05)
  327. else:
  328. return
  329. except RequestTooLargeError as e:
  330. self.request.send(self._response_builder.WriteError(
  331. '413 Request Entity Too Large', e))
  332. raise
  333. except UnexpectedMethodError as e:
  334. self.request.send(self._response_builder.WriteError(
  335. '405 Method Not Allowed', e))
  336. raise
  337. except ServeIndexError:
  338. self.request.send(self._response_builder.WriteError(
  339. '200 OK', INFO_MESSAGE))
  340. except Exception as e:
  341. print e
  342. self.request.close()
  343. class PipelineServer(SocketServer.ForkingMixIn, SocketServer.TCPServer):
  344. pass
  345. parser = argparse.ArgumentParser()
  346. parser.add_argument("--port", action="store", default=0,
  347. type=int, help="port to listen on")
  348. parser.add_argument("--verbose", action="store", default=0,
  349. type=int, help="verbose output")
  350. parser.add_argument("--pidfile", action="store", default=0,
  351. help="file name for the PID")
  352. parser.add_argument("--logfile", action="store", default=0,
  353. help="file name for the log")
  354. parser.add_argument("--srcdir", action="store", default=0,
  355. help="test directory")
  356. parser.add_argument("--id", action="store", default=0,
  357. help="server ID")
  358. parser.add_argument("--ipv4", action="store_true", default=0,
  359. help="IPv4 flag")
  360. args = parser.parse_args()
  361. if args.pidfile:
  362. pid = os.getpid()
  363. f = open(args.pidfile, 'w')
  364. f.write('{}'.format(pid))
  365. f.close()
  366. server = PipelineServer(('0.0.0.0', args.port), PipelineRequestHandler)
  367. server.allow_reuse_address = True
  368. server.serve_forever()