TestBigfile.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. import time
  2. from cStringIO import StringIO
  3. import pytest
  4. import msgpack
  5. import mock
  6. from Connection import ConnectionServer
  7. from Content.ContentManager import VerifyError
  8. from File import FileServer
  9. from File import FileRequest
  10. from Worker import WorkerManager
  11. from Peer import Peer
  12. from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked
  13. from Test import Spy
  14. @pytest.mark.usefixtures("resetSettings")
  15. @pytest.mark.usefixtures("resetTempSettings")
  16. class TestBigfile:
  17. privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"
  18. def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10):
  19. f = site.storage.open(inner_path, "w")
  20. for i in range(pieces * 100):
  21. f.write(("Test%s" % i).ljust(10, "-") * 1000)
  22. f.close()
  23. assert site.content_manager.sign("content.json", self.privatekey)
  24. return inner_path
  25. def testPiecemapCreate(self, site):
  26. inner_path = self.createBigfile(site)
  27. content = site.storage.loadJson("content.json")
  28. assert "data/optional.any.iso" in content["files_optional"]
  29. file_node = content["files_optional"][inner_path]
  30. assert file_node["size"] == 10 * 1000 * 1000
  31. assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6"
  32. assert file_node["piecemap"] == inner_path + ".piecemap.msgpack"
  33. piecemap = msgpack.unpack(site.storage.open(file_node["piecemap"], "rb"))["optional.any.iso"]
  34. assert len(piecemap["sha512_pieces"]) == 10
  35. assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1]
  36. assert piecemap["sha512_pieces"][0].encode("hex") == "a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3"
  37. def testVerifyPiece(self, site):
  38. inner_path = self.createBigfile(site)
  39. # Verify all 10 piece
  40. f = site.storage.open(inner_path, "rb")
  41. for i in range(10):
  42. piece = StringIO(f.read(1024 * 1024))
  43. piece.seek(0)
  44. site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
  45. f.close()
  46. # Try to verify piece 0 with piece 1 hash
  47. with pytest.raises(VerifyError) as err:
  48. i = 1
  49. f = site.storage.open(inner_path, "rb")
  50. piece = StringIO(f.read(1024 * 1024))
  51. f.close()
  52. site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
  53. assert "Invalid hash" in str(err)
  54. def testSparseFile(self, site):
  55. inner_path = "sparsefile"
  56. # Create a 100MB sparse file
  57. site.storage.createSparseFile(inner_path, 100 * 1024 * 1024)
  58. # Write to file beginning
  59. s = time.time()
  60. f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), "hellostart" * 1024)
  61. time_write_start = time.time() - s
  62. # Write to file end
  63. s = time.time()
  64. f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), "helloend" * 1024)
  65. time_write_end = time.time() - s
  66. # Verify writes
  67. f = site.storage.open(inner_path)
  68. assert f.read(10) == "hellostart"
  69. f.seek(99 * 1024 * 1024)
  70. assert f.read(8) == "helloend"
  71. f.close()
  72. site.storage.delete(inner_path)
  73. # Writing to end shold not take much longer, than writing to start
  74. assert time_write_end <= max(0.1, time_write_start * 1.1)
  75. def testRangedFileRequest(self, file_server, site, site_temp):
  76. inner_path = self.createBigfile(site)
  77. file_server.sites[site.address] = site
  78. client = FileServer(file_server.ip, 1545)
  79. client.sites[site_temp.address] = site_temp
  80. site_temp.connection_server = client
  81. connection = client.getConnection(file_server.ip, 1544)
  82. # Add file_server as peer to client
  83. peer_file_server = site_temp.addPeer(file_server.ip, 1544)
  84. buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
  85. assert len(buff.getvalue()) == 1 * 1024 * 1024 # Correct block size
  86. assert buff.getvalue().startswith("Test524") # Correct data
  87. buff.seek(0)
  88. assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff) # Correct hash
  89. connection.close()
  90. client.stop()
  91. def testRangedFileDownload(self, file_server, site, site_temp):
  92. inner_path = self.createBigfile(site)
  93. # Init source server
  94. site.connection_server = file_server
  95. file_server.sites[site.address] = site
  96. # Make sure the file and the piecemap in the optional hashfield
  97. file_info = site.content_manager.getFileInfo(inner_path)
  98. assert site.content_manager.hashfield.hasHash(file_info["sha512"])
  99. piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"]
  100. assert site.content_manager.hashfield.hasHash(piecemap_hash)
  101. # Init client server
  102. client = ConnectionServer(file_server.ip, 1545)
  103. site_temp.connection_server = client
  104. peer_client = site_temp.addPeer(file_server.ip, 1544)
  105. # Download site
  106. site_temp.download(blind_includes=True).join(timeout=5)
  107. bad_files = site_temp.storage.verifyFiles(quick_check=True)["bad_files"]
  108. assert not bad_files
  109. # client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring()
  110. # assert client_piecefield == "1" * 10
  111. # Download 5. and 10. block
  112. site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
  113. site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
  114. # Verify 0. block not downloaded
  115. f = site_temp.storage.open(inner_path)
  116. assert f.read(10) == "\0" * 10
  117. # Verify 5. and 10. block downloaded
  118. f.seek(5 * 1024 * 1024)
  119. assert f.read(7) == "Test524"
  120. f.seek(9 * 1024 * 1024)
  121. assert f.read(7) == "943---T"
  122. # Verify hashfield
  123. assert set(site_temp.content_manager.hashfield) == set([18343, 30970]) # 18343: data/optional.any.iso, 30970: data/optional.any.iso.hashmap.msgpack
  124. def testOpenBigfile(self, file_server, site, site_temp):
  125. inner_path = self.createBigfile(site)
  126. # Init source server
  127. site.connection_server = file_server
  128. file_server.sites[site.address] = site
  129. # Init client server
  130. client = ConnectionServer(file_server.ip, 1545)
  131. site_temp.connection_server = client
  132. site_temp.addPeer(file_server.ip, 1544)
  133. # Download site
  134. site_temp.download(blind_includes=True).join(timeout=5)
  135. # Open virtual file
  136. assert not site_temp.storage.isFile(inner_path)
  137. with site_temp.storage.openBigfile(inner_path) as f:
  138. with Spy.Spy(FileRequest, "route") as requests:
  139. f.seek(5 * 1024 * 1024)
  140. assert f.read(7) == "Test524"
  141. f.seek(9 * 1024 * 1024)
  142. assert f.read(7) == "943---T"
  143. assert len(requests) == 4 # 1x peicemap + 1x getpiecefield + 2x for pieces
  144. assert set(site_temp.content_manager.hashfield) == set([18343, 30970])
  145. assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001"
  146. assert f.sha512 in site_temp.getSettingsCache()["piecefields"]
  147. # Test requesting already downloaded
  148. with Spy.Spy(FileRequest, "route") as requests:
  149. f.seek(5 * 1024 * 1024)
  150. assert f.read(7) == "Test524"
  151. assert len(requests) == 0
  152. # Test requesting multi-block overflow reads
  153. with Spy.Spy(FileRequest, "route") as requests:
  154. f.seek(5 * 1024 * 1024) # We already have this block
  155. data = f.read(1024 * 1024 * 3) # Our read overflow to 6. and 7. block
  156. assert data.startswith("Test524")
  157. assert data.endswith("Test838-")
  158. assert "\0" not in data # No null bytes allowed
  159. assert len(requests) == 2 # Two block download
  160. # Test out of range request
  161. f.seek(5 * 1024 * 1024)
  162. data = f.read(1024 * 1024 * 30)
  163. assert len(data) == 10 * 1000 * 1000 - (5 * 1024 * 1024)
  164. f.seek(30 * 1024 * 1024)
  165. data = f.read(1024 * 1024 * 30)
  166. assert len(data) == 0
  167. @pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked])
  168. def testPiecefield(self, piecefield_obj, site):
  169. testdatas = [
  170. "1" * 100 + "0" * 900 + "1" * 4000 + "0" * 4999 + "1",
  171. "010101" * 10 + "01" * 90 + "10" * 400 + "0" * 4999,
  172. "1" * 10000,
  173. "0" * 10000
  174. ]
  175. for testdata in testdatas:
  176. piecefield = piecefield_obj()
  177. piecefield.fromstring(testdata)
  178. assert piecefield.tostring() == testdata
  179. assert piecefield[0] == int(testdata[0])
  180. assert piecefield[100] == int(testdata[100])
  181. assert piecefield[1000] == int(testdata[1000])
  182. assert piecefield[len(testdata) - 1] == int(testdata[len(testdata) - 1])
  183. packed = piecefield.pack()
  184. piecefield_new = piecefield_obj()
  185. piecefield_new.unpack(packed)
  186. assert piecefield.tostring() == piecefield_new.tostring()
  187. assert piecefield_new.tostring() == testdata
  188. def testFileGet(self, file_server, site, site_temp):
  189. inner_path = self.createBigfile(site)
  190. # Init source server
  191. site.connection_server = file_server
  192. file_server.sites[site.address] = site
  193. # Init client server
  194. site_temp.connection_server = FileServer(file_server.ip, 1545)
  195. site_temp.connection_server.sites[site_temp.address] = site_temp
  196. site_temp.addPeer(file_server.ip, 1544)
  197. # Download site
  198. site_temp.download(blind_includes=True).join(timeout=5)
  199. # Download second block
  200. with site_temp.storage.openBigfile(inner_path) as f:
  201. f.seek(1024 * 1024)
  202. assert f.read(1024)[0] != "\0"
  203. # Make sure first block not download
  204. with site_temp.storage.open(inner_path) as f:
  205. assert f.read(1024)[0] == "\0"
  206. peer2 = site.addPeer(file_server.ip, 1545, return_peer=True)
  207. # Should drop error on first block request
  208. assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1))
  209. # Should not drop error for second block request
  210. assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2))
  211. def benchmarkPeerMemory(self, site, file_server):
  212. # Init source server
  213. site.connection_server = file_server
  214. file_server.sites[site.address] = site
  215. import psutil, os
  216. meminfo = psutil.Process(os.getpid()).memory_info
  217. mem_s = meminfo()[0]
  218. s = time.time()
  219. for i in range(25000):
  220. site.addPeer(file_server.ip, i)
  221. print "%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024) # 0.082s MEM: + 6800KB
  222. print site.peers.values()[0].piecefields
  223. def testUpdatePiecefield(self, file_server, site, site_temp):
  224. inner_path = self.createBigfile(site)
  225. server1 = file_server
  226. server1.sites[site.address] = site
  227. server2 = FileServer(file_server.ip, 1545)
  228. server2.sites[site_temp.address] = site_temp
  229. site_temp.connection_server = server2
  230. # Add file_server as peer to client
  231. server2_peer1 = site_temp.addPeer(file_server.ip, 1544)
  232. # Testing piecefield sync
  233. assert len(server2_peer1.piecefields) == 0
  234. assert server2_peer1.updatePiecefields() # Query piecefields from peer
  235. assert len(server2_peer1.piecefields) > 0
  236. def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp):
  237. inner_path = self.createBigfile(site)
  238. server1 = file_server
  239. server1.sites[site.address] = site
  240. server2 = FileServer(file_server.ip, 1545)
  241. server2.sites[site_temp.address] = site_temp
  242. site_temp.connection_server = server2
  243. # Add file_server as peer to client
  244. server2_peer1 = site_temp.addPeer(file_server.ip, 1544) # Working
  245. site_temp.downloadContent("content.json", download_files=False)
  246. site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
  247. # Add fake peers with optional files downloaded
  248. for i in range(5):
  249. fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544)
  250. fake_peer.hashfield = site.content_manager.hashfield
  251. fake_peer.has_hashfield = True
  252. with Spy.Spy(WorkerManager, "addWorker") as requests:
  253. site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
  254. site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024))
  255. # It should only request parts from peer1 as the other peers does not have the requested parts in piecefields
  256. assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0
  257. def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp):
  258. inner_path = self.createBigfile(site)
  259. server1 = file_server
  260. server1.sites[site.address] = site
  261. server2 = FileServer(file_server.ip, 1545)
  262. server2.sites[site_temp.address] = site_temp
  263. site_temp.connection_server = server2
  264. sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
  265. # Create 10 fake peer for each piece
  266. for i in range(10):
  267. peer = Peer(file_server.ip, 1544, site_temp, server2)
  268. peer.piecefields[sha512][i] = "1"
  269. peer.updateHashfield = mock.MagicMock(return_value=False)
  270. peer.updatePiecefields = mock.MagicMock(return_value=False)
  271. peer.findHashIds = mock.MagicMock(return_value={"nope": []})
  272. peer.hashfield = site.content_manager.hashfield
  273. peer.has_hashfield = True
  274. peer.key = "Peer:%s" % i
  275. site_temp.peers["Peer:%s" % i] = peer
  276. site_temp.downloadContent("content.json", download_files=False)
  277. site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
  278. with Spy.Spy(Peer, "getFile") as requests:
  279. for i in range(10):
  280. site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024))
  281. assert len(requests) == 10
  282. for i in range(10):
  283. assert requests[i][0] == site_temp.peers["Peer:%s" % i] # Every part should be requested from piece owner peer
  284. def testDownloadStats(self, file_server, site, site_temp):
  285. inner_path = self.createBigfile(site)
  286. # Init source server
  287. site.connection_server = file_server
  288. file_server.sites[site.address] = site
  289. # Init client server
  290. client = ConnectionServer(file_server.ip, 1545)
  291. site_temp.connection_server = client
  292. site_temp.addPeer(file_server.ip, 1544)
  293. # Download site
  294. site_temp.download(blind_includes=True).join(timeout=5)
  295. # Open virtual file
  296. assert not site_temp.storage.isFile(inner_path)
  297. # Check size before downloads
  298. assert site_temp.settings["size"] < 10 * 1024 * 1024
  299. assert site_temp.settings["optional_downloaded"] == 0
  300. size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"]
  301. size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"]
  302. with site_temp.storage.openBigfile(inner_path) as f:
  303. assert "\0" not in f.read(1024)
  304. assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
  305. with site_temp.storage.openBigfile(inner_path) as f:
  306. # Don't count twice
  307. assert "\0" not in f.read(1024)
  308. assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
  309. # Add second block
  310. assert "\0" not in f.read(1024 * 1024)
  311. assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
  312. def testPrebuffer(self, file_server, site, site_temp):
  313. inner_path = self.createBigfile(site)
  314. # Init source server
  315. site.connection_server = file_server
  316. file_server.sites[site.address] = site
  317. # Init client server
  318. client = ConnectionServer(file_server.ip, 1545)
  319. site_temp.connection_server = client
  320. site_temp.addPeer(file_server.ip, 1544)
  321. # Download site
  322. site_temp.download(blind_includes=True).join(timeout=5)
  323. # Open virtual file
  324. assert not site_temp.storage.isFile(inner_path)
  325. with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f:
  326. with Spy.Spy(FileRequest, "route") as requests:
  327. f.seek(5 * 1024 * 1024)
  328. assert f.read(7) == "Test524"
  329. # assert len(requests) == 3 # 1x piecemap + 1x getpiecefield + 1x for pieces
  330. assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2
  331. time.sleep(0.5) # Wait prebuffer download
  332. sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
  333. assert site_temp.storage.piecefields[sha512].tostring() == "0000011100"
  334. # No prebuffer beyond end of the file
  335. f.seek(9 * 1024 * 1024)
  336. assert "\0" not in f.read(7)
  337. assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0
  338. def testDownloadAllPieces(self, file_server, site, site_temp):
  339. inner_path = self.createBigfile(site)
  340. # Init source server
  341. site.connection_server = file_server
  342. file_server.sites[site.address] = site
  343. # Init client server
  344. client = ConnectionServer(file_server.ip, 1545)
  345. site_temp.connection_server = client
  346. site_temp.addPeer(file_server.ip, 1544)
  347. # Download site
  348. site_temp.download(blind_includes=True).join(timeout=5)
  349. # Open virtual file
  350. assert not site_temp.storage.isFile(inner_path)
  351. with Spy.Spy(FileRequest, "route") as requests:
  352. site_temp.needFile("%s|all" % inner_path)
  353. assert len(requests) == 12 # piecemap.msgpack, getPiecefields, 10 x piece
  354. # Don't re-download already got pieces
  355. with Spy.Spy(FileRequest, "route") as requests:
  356. site_temp.needFile("%s|all" % inner_path)
  357. assert len(requests) == 0
  358. def testFileSize(self, file_server, site, site_temp):
  359. inner_path = self.createBigfile(site)
  360. # Init source server
  361. site.connection_server = file_server
  362. file_server.sites[site.address] = site
  363. # Init client server
  364. client = ConnectionServer(file_server.ip, 1545)
  365. site_temp.connection_server = client
  366. site_temp.addPeer(file_server.ip, 1544)
  367. # Download site
  368. site_temp.download(blind_includes=True).join(timeout=5)
  369. # Open virtual file
  370. assert not site_temp.storage.isFile(inner_path)
  371. # Download first block
  372. site_temp.needFile("%s|%s-%s" % (inner_path, 0 * 1024 * 1024, 1 * 1024 * 1024))
  373. assert site_temp.storage.getSize(inner_path) < 1000 * 1000 * 10 # Size on the disk should be smaller than the real size
  374. site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
  375. assert site_temp.storage.getSize(inner_path) == site.storage.getSize(inner_path)
  376. @pytest.mark.parametrize("size", [1024 * 3, 1024 * 1024 * 3, 1024 * 1024 * 30])
  377. def testNullFileRead(self, file_server, site, site_temp, size):
  378. inner_path = "data/optional.iso"
  379. f = site.storage.open(inner_path, "w")
  380. f.write("\0" * size)
  381. f.close()
  382. assert site.content_manager.sign("content.json", self.privatekey)
  383. # Init source server
  384. site.connection_server = file_server
  385. file_server.sites[site.address] = site
  386. # Init client server
  387. site_temp.connection_server = FileServer(file_server.ip, 1545)
  388. site_temp.connection_server.sites[site_temp.address] = site_temp
  389. site_temp.addPeer(file_server.ip, 1544)
  390. # Download site
  391. site_temp.download(blind_includes=True).join(timeout=5)
  392. if "piecemap" in site.content_manager.getFileInfo(inner_path): # Bigfile
  393. site_temp.needFile(inner_path + "|all")
  394. else:
  395. site_temp.needFile(inner_path)
  396. assert site_temp.storage.getSize(inner_path) == size