Connection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. import socket
  2. import time
  3. import gevent
  4. import msgpack
  5. from Config import config
  6. from Debug import Debug
  7. from util import StreamingMsgpack
  8. from Crypt import CryptConnection
  9. class Connection(object):
  10. __slots__ = (
  11. "sock", "sock_wrapped", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id",
  12. "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time",
  13. "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent",
  14. "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests", "waiting_streams"
  15. )
  16. def __init__(self, server, ip, port, sock=None):
  17. self.sock = sock
  18. self.ip = ip
  19. self.port = port
  20. self.peer_id = None # Bittorrent style peer id (not used yet)
  21. self.id = server.last_connection_id
  22. server.last_connection_id += 1
  23. self.protocol = "?"
  24. self.type = "?"
  25. self.server = server
  26. self.unpacker = None # Stream incoming socket messages here
  27. self.req_id = 0 # Last request id
  28. self.handshake = {} # Handshake info got from peer
  29. self.crypt = None # Connection encryption method
  30. self.sock_wrapped = False # Socket wrapped to encryption
  31. self.connected = False
  32. self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
  33. self.closed = False
  34. # Stats
  35. self.start_time = time.time()
  36. self.last_recv_time = 0
  37. self.last_message_time = 0
  38. self.last_send_time = 0
  39. self.last_sent_time = 0
  40. self.incomplete_buff_recv = 0
  41. self.bytes_recv = 0
  42. self.bytes_sent = 0
  43. self.last_ping_delay = None
  44. self.last_req_time = 0
  45. self.last_cmd = None
  46. self.name = None
  47. self.updateName()
  48. self.waiting_requests = {} # Waiting sent requests
  49. self.waiting_streams = {} # Waiting response file streams
  50. def updateName(self):
  51. self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
  52. def __str__(self):
  53. return self.name
  54. def __repr__(self):
  55. return "<%s>" % self.__str__()
  56. def log(self, text):
  57. self.server.log.debug("%s > %s" % (self.name, text))
  58. # Open connection to peer and wait for handshake
  59. def connect(self):
  60. self.log("Connecting...")
  61. self.type = "out"
  62. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  63. self.sock.connect((self.ip, int(self.port)))
  64. # Implicit SSL in the future
  65. # self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa")
  66. # self.sock.do_handshake()
  67. # self.crypt = "tls-rsa"
  68. # self.sock_wrapped = True
  69. # Detect protocol
  70. self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
  71. gevent.spawn(self.messageLoop)
  72. return self.event_connected.get() # Wait for handshake
  73. # Handle incoming connection
  74. def handleIncomingConnection(self, sock):
  75. self.log("Incoming connection...")
  76. self.type = "in"
  77. try:
  78. if sock.recv(1, gevent.socket.MSG_PEEK) == "\x16":
  79. self.log("Crypt in connection using implicit SSL")
  80. self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", True)
  81. self.sock_wrapped = True
  82. self.crypt = "tls-rsa"
  83. except Exception, err:
  84. self.log("Socket peek error: %s" % Debug.formatException(err))
  85. self.messageLoop()
  86. # Message loop for connection
  87. def messageLoop(self):
  88. if not self.sock:
  89. self.log("Socket error: No socket found")
  90. return False
  91. self.protocol = "v2"
  92. self.updateName()
  93. self.connected = True
  94. self.unpacker = msgpack.Unpacker()
  95. try:
  96. while True:
  97. buff = self.sock.recv(16 * 1024)
  98. if not buff:
  99. break # Connection closed
  100. # Statistics
  101. self.last_recv_time = time.time()
  102. self.incomplete_buff_recv += 1
  103. self.bytes_recv += len(buff)
  104. self.server.bytes_recv += len(buff)
  105. if not self.unpacker:
  106. self.unpacker = msgpack.Unpacker()
  107. self.unpacker.feed(buff)
  108. buff = None
  109. for message in self.unpacker:
  110. self.incomplete_buff_recv = 0
  111. if "stream_bytes" in message:
  112. self.handleStream(message)
  113. else:
  114. self.handleMessage(message)
  115. message = None
  116. except Exception, err:
  117. if not self.closed:
  118. self.log("Socket error: %s" % Debug.formatException(err))
  119. self.close() # MessageLoop ended, close connection
  120. # My handshake info
  121. def handshakeInfo(self):
  122. return {
  123. "version": config.version,
  124. "protocol": "v2",
  125. "peer_id": self.server.peer_id,
  126. "fileserver_port": self.server.port,
  127. "port_opened": self.server.port_opened,
  128. "rev": config.rev,
  129. "crypt_supported": CryptConnection.manager.crypt_supported,
  130. "crypt": self.crypt
  131. }
  132. def setHandshake(self, handshake):
  133. self.handshake = handshake
  134. if handshake.get("port_opened", None) is False: # Not connectable
  135. self.port = 0
  136. else:
  137. self.port = handshake["fileserver_port"] # Set peer fileserver port
  138. # Check if we can encrypt the connection
  139. if handshake.get("crypt_supported"):
  140. if handshake.get("crypt"): # Recommended crypt by server
  141. crypt = handshake["crypt"]
  142. else: # Select the best supported on both sides
  143. crypt = CryptConnection.manager.selectCrypt(handshake["crypt_supported"])
  144. if crypt:
  145. self.crypt = crypt
  146. self.event_connected.set(True) # Mark handshake as done
  147. # Handle incoming message
  148. def handleMessage(self, message):
  149. self.last_message_time = time.time()
  150. if message.get("cmd") == "response": # New style response
  151. if message["to"] in self.waiting_requests:
  152. if self.last_send_time:
  153. ping = time.time() - self.last_send_time
  154. self.last_ping_delay = ping
  155. self.waiting_requests[message["to"]].set(message) # Set the response to event
  156. del self.waiting_requests[message["to"]]
  157. elif message["to"] == 0: # Other peers handshake
  158. ping = time.time() - self.start_time
  159. if config.debug_socket:
  160. self.log("Handshake response: %s, ping: %s" % (message, ping))
  161. self.last_ping_delay = ping
  162. # Server switched to crypt, lets do it also if not crypted already
  163. if message.get("crypt") and not self.sock_wrapped:
  164. self.crypt = message["crypt"]
  165. server = (self.type == "in")
  166. self.log("Crypt out connection using: %s (server side: %s)..." % (self.crypt, server))
  167. self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server)
  168. self.sock.do_handshake()
  169. self.setHandshake(message)
  170. else:
  171. self.log("Unknown response: %s" % message)
  172. elif message.get("cmd"): # Handhsake request
  173. if message["cmd"] == "handshake":
  174. if config.debug_socket:
  175. self.log("Handshake request: %s" % message)
  176. self.setHandshake(message["params"])
  177. data = self.handshakeInfo()
  178. data["cmd"] = "response"
  179. data["to"] = message["req_id"]
  180. self.send(data) # Send response to handshake
  181. # Sent crypt request to client
  182. if self.crypt and not self.sock_wrapped:
  183. server = (self.type == "in")
  184. self.log("Crypt in connection using: %s (server side: %s)..." % (self.crypt, server))
  185. self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server)
  186. self.sock_wrapped = True
  187. else:
  188. self.server.handleRequest(self, message)
  189. else: # Old style response, no req_id definied
  190. if config.debug_socket:
  191. self.log("Old style response, waiting: %s" % self.waiting_requests.keys())
  192. last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
  193. self.waiting_requests[last_req_id].set(message)
  194. del self.waiting_requests[last_req_id] # Remove from waiting request
  195. # Stream socket directly to a file
  196. def handleStream(self, message):
  197. if config.debug_socket:
  198. self.log("Starting stream %s: %s bytes" % (message["to"], message["stream_bytes"]))
  199. read_bytes = message["stream_bytes"] # Bytes left we have to read from socket
  200. try:
  201. buff = self.unpacker.read_bytes(min(16 * 1024, read_bytes)) # Check if the unpacker has something left in buffer
  202. except Exception, err:
  203. buff = ""
  204. file = self.waiting_streams[message["to"]]
  205. if buff:
  206. read_bytes -= len(buff)
  207. file.write(buff)
  208. try:
  209. while 1:
  210. if read_bytes <= 0:
  211. break
  212. buff = self.sock.recv(16 * 1024)
  213. if not buff:
  214. break
  215. buff_len = len(buff)
  216. read_bytes -= buff_len
  217. file.write(buff)
  218. # Statistics
  219. self.last_recv_time = time.time()
  220. self.incomplete_buff_recv += 1
  221. self.bytes_recv += buff_len
  222. self.server.bytes_recv += buff_len
  223. except Exception, err:
  224. self.log("Stream read error: %s" % Debug.formatException(err))
  225. if config.debug_socket:
  226. self.log("End stream %s" % message["to"])
  227. self.incomplete_buff_recv = 0
  228. self.waiting_requests[message["to"]].set(message) # Set the response to event
  229. del self.waiting_streams[message["to"]]
  230. del self.waiting_requests[message["to"]]
  231. # Send data to connection
  232. def send(self, message, streaming=False):
  233. if config.debug_socket:
  234. self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (
  235. message.get("cmd"), message.get("to"), streaming,
  236. message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"),
  237. message.get("req_id"))
  238. )
  239. self.last_send_time = time.time()
  240. try:
  241. if streaming:
  242. bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
  243. message = None
  244. self.bytes_sent += bytes_sent
  245. self.server.bytes_sent += bytes_sent
  246. else:
  247. data = msgpack.packb(message)
  248. message = None
  249. self.bytes_sent += len(data)
  250. self.server.bytes_sent += len(data)
  251. self.sock.sendall(data)
  252. except Exception, err:
  253. self.log("Send errror: %s" % Debug.formatException(err))
  254. self.close()
  255. return False
  256. self.last_sent_time = time.time()
  257. return True
  258. # Stream raw file to connection
  259. def sendRawfile(self, file, read_bytes):
  260. buff = 64 * 1024
  261. bytes_left = read_bytes
  262. while True:
  263. self.last_send_time = time.time()
  264. self.sock.sendall(
  265. file.read(min(bytes_left, buff))
  266. )
  267. bytes_left -= buff
  268. if bytes_left <= 0:
  269. break
  270. self.bytes_sent += read_bytes
  271. self.server.bytes_sent += read_bytes
  272. return True
  273. # Create and send a request to peer
  274. def request(self, cmd, params={}, stream_to=None):
  275. # Last command sent more than 10 sec ago, timeout
  276. if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
  277. self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
  278. self.close()
  279. return False
  280. self.last_req_time = time.time()
  281. self.last_cmd = cmd
  282. self.req_id += 1
  283. data = {"cmd": cmd, "req_id": self.req_id, "params": params}
  284. event = gevent.event.AsyncResult() # Create new event for response
  285. self.waiting_requests[self.req_id] = event
  286. if stream_to:
  287. self.waiting_streams[self.req_id] = stream_to
  288. self.send(data) # Send request
  289. res = event.get() # Wait until event solves
  290. return res
  291. def ping(self):
  292. s = time.time()
  293. response = None
  294. with gevent.Timeout(10.0, False):
  295. try:
  296. response = self.request("ping")
  297. except Exception, err:
  298. self.log("Ping error: %s" % Debug.formatException(err))
  299. if response and "body" in response and response["body"] == "Pong!":
  300. self.last_ping_delay = time.time() - s
  301. return True
  302. else:
  303. return False
  304. # Close connection
  305. def close(self):
  306. if self.closed:
  307. return False # Already closed
  308. self.closed = True
  309. self.connected = False
  310. self.event_connected.set(False)
  311. if config.debug_socket:
  312. self.log(
  313. "Closing connection, waiting_requests: %s, buff: %s..." %
  314. (len(self.waiting_requests), self.incomplete_buff_recv)
  315. )
  316. for request in self.waiting_requests.values(): # Mark pending requests failed
  317. request.set(False)
  318. self.waiting_requests = {}
  319. self.waiting_streams = {}
  320. self.server.removeConnection(self) # Remove connection from server registry
  321. try:
  322. if self.sock:
  323. self.sock.shutdown(gevent.socket.SHUT_WR)
  324. self.sock.close()
  325. except Exception, err:
  326. if config.debug_socket:
  327. self.log("Close error: %s" % err)
  328. # Little cleanup
  329. self.sock = None
  330. self.unpacker = None