HuffDec11.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import os, sys, struct, zlib
  2. class Error(Exception): pass
  3. def cwDec(w): # Convert 16-bit value to string codeword
  4. return bin(0x10000 | w).rstrip('0')[3:-1]
  5. def cwEnc(cw): # Convert string codeword to 16-bit value
  6. return int((cw+'1').ljust(16, '0'), 2)
  7. #***************************************************************************
  8. #***************************************************************************
  9. #***************************************************************************
  10. def HuffTabReader_bin(ab):
  11. fmtRec = struct.Struct("<HB")
  12. o = 0
  13. while o < len(ab):
  14. w, cb = fmtRec.unpack_from(ab, o)
  15. o += fmtRec.size
  16. v = ab[o:o+cb]
  17. assert len(v) == cb
  18. o += cb
  19. yield(cwDec(w), cb, v)
  20. #***************************************************************************
  21. #***************************************************************************
  22. #***************************************************************************
  23. class HuffNode(object):
  24. def __init__(self, cw, hd):
  25. self.cw = cw # String codeword value
  26. self.w = cwEnc(cw) # Encoded codeword value
  27. if hd:
  28. self.nBits = len(cw) # Length of codeword in bits
  29. self.cb = hd.dLen.get(cw, None)
  30. self.av = [d.get(cw, None) for d in hd.adTab]
  31. else:
  32. self.nBits = None # Actual length of codeword is unknown
  33. #***************************************************************************
  34. #***************************************************************************
  35. #***************************************************************************
  36. class HuffDecoder(object):
  37. NAMES = ("Code", "Data")
  38. DUMP_KNOWN = 0
  39. DUMP_LEN = 1
  40. DUMP_ALL = 2
  41. fmtInt = struct.Struct("<L")
  42. baseDir = os.path.split(__file__)[0]
  43. BLOCK_SIZE = 0x1000 # 4K bytes
  44. def __init__(self):
  45. with open(os.path.join(self.baseDir, "huff11.bin"), "rb") as fi: self.unpackTables(zlib.decompress(fi.read(), -15)) # Load from compressed version
  46. self.prepareMap()
  47. def loadTable(self, items):
  48. sv = set() # Set for values
  49. d = {}
  50. for cw, cb, v in items:
  51. if cw in d: raise Error("Codeword %s already defined" % cw)
  52. if cb is None: continue
  53. cbKnown = self.dLen.get(cw, None)
  54. if cbKnown is None: self.dLen[cw] = cb
  55. elif cb != cbKnown: raise Error("Codeword %s sequence length %d != know %d" % (cw, cb, cbKnown))
  56. if v is None: continue
  57. assert len(v) == cb
  58. d[cw] = v # Remember value
  59. sv.add(v)
  60. self.adTab.append(d)
  61. def unpackTables(self, ab):
  62. n, = self.fmtInt.unpack_from(ab)
  63. o = self.fmtInt.size
  64. self.dLen, self.adTab = {}, []
  65. for i in xrange(n):
  66. cb, = self.fmtInt.unpack_from(ab, o)
  67. o += self.fmtInt.size
  68. data = ab[o:o+cb]
  69. assert len(data) == cb
  70. o += cb
  71. self.loadTable(HuffTabReader_bin(data))
  72. def propagateMap(self, node):
  73. cw = node.cw
  74. for idx in xrange(int(cw[::-1], 2), len(self.aMap), 1<<len(cw)):
  75. assert self.aMap[idx] is None
  76. self.aMap[idx] = node
  77. def prepareMap(self):
  78. aCW = sorted(self.dLen.keys())[::-1]
  79. minBits, maxBits = len(aCW[0]), len(aCW[-1])
  80. self.aMap = [None]*(1<<maxBits) # 2**maxBits map
  81. aCW.append('0'*(maxBits+1)) # Longer than max
  82. nBits = minBits # Current length
  83. e = int(aCW[0], 2)|1 # End value for current length
  84. for o in xrange(1, len(aCW)):
  85. nextBits = len(aCW[o])
  86. if nextBits == nBits: continue # Run until length change
  87. assert nextBits > nBits # Length must increase
  88. s = int(aCW[o-1], 2) # Start value for current length
  89. for i in xrange(s, e+1):
  90. cw = bin(i)[2:].zfill(nBits)
  91. self.propagateMap(HuffNode(cw, self))
  92. e = int(aCW[o], 2)|1 # End value for next length
  93. for i in xrange(e/2 + 1, s): # Handle values with unknown codeword length
  94. cw = bin(i)[2:].zfill(nBits)
  95. self.propagateMap(HuffNode(cw, None))
  96. nBits = nextBits
  97. for v in self.aMap: assert v is not None
  98. def enumCW(self, ab):
  99. v = int(bin(int("01"+ab.encode("hex"), 16))[3:][::-1], 2) # Reversed bits
  100. cb = 0
  101. while cb < self.BLOCK_SIZE: # Block length
  102. node = self.aMap[v & 0x7FFF]
  103. if node.nBits is None: raise Error("Unknown codeword %s* length" % node.cw)
  104. yield node
  105. v >>= node.nBits
  106. if node.cb is not None: cb += node.cb
  107. def decompressChunk(self, ab, iTab):
  108. r = []
  109. cb = 0
  110. for node in self.enumCW(ab):
  111. v = node.av[iTab]
  112. if v is None: raise Error("Unknown sequence for codeword %s in table #%d" % (node.cw, iTab))
  113. r.append(v)
  114. cb += len(v)
  115. if cb >= self.BLOCK_SIZE: break
  116. return "".join(r)
  117. def decompress(self, ab, length):
  118. nChunks, left = divmod(length, self.BLOCK_SIZE)
  119. assert 0 == left
  120. aOfs = list(struct.unpack_from("<%dL" % nChunks, ab))
  121. aOpt = [0]*nChunks
  122. for i in xrange(nChunks):
  123. aOpt[i], aOfs[i] = divmod(aOfs[i], 0x40000000)
  124. base = nChunks*4
  125. aOfs.append(len(ab) - base)
  126. r = []
  127. for i, opt in enumerate(aOpt):
  128. iTab, bCompr = divmod(opt, 2)
  129. assert 1 == bCompr
  130. unpacked = self.decompressChunk(ab[base + aOfs[i]: base + aOfs[i+1]], iTab)
  131. assert len(unpacked) == self.BLOCK_SIZE
  132. r.append(unpacked)
  133. return "".join(r)