""" maxminddb.decoder ~~~~~~~~~~~~~~~~~ This package contains code for decoding the MaxMind DB data section. """ from __future__ import unicode_literals import struct from maxminddb.compat import byte_from_int, int_from_bytes from maxminddb.errors import InvalidDatabaseError class Decoder(object): # pylint: disable=too-few-public-methods """Decoder for the data section of the MaxMind DB""" def __init__(self, database_buffer, pointer_base=0, pointer_test=False): """Created a Decoder for a MaxMind DB Arguments: database_buffer -- an mmap'd MaxMind DB file. pointer_base -- the base number to use when decoding a pointer pointer_test -- used for internal unit testing of pointer code """ self._pointer_test = pointer_test self._buffer = database_buffer self._pointer_base = pointer_base def _decode_array(self, size, offset): array = [] for _ in range(size): (value, offset) = self.decode(offset) array.append(value) return array, offset def _decode_boolean(self, size, offset): return size != 0, offset def _decode_bytes(self, size, offset): new_offset = offset + size return self._buffer[offset:new_offset], new_offset # pylint: disable=no-self-argument # |-> I am open to better ways of doing this as long as it doesn't involve # lots of code duplication. def _decode_packed_type(type_code, type_size, pad=False): # pylint: disable=protected-access, missing-docstring def unpack_type(self, size, offset): if not pad: self._verify_size(size, type_size) new_offset = offset + type_size packed_bytes = self._buffer[offset:new_offset] if pad: packed_bytes = packed_bytes.rjust(type_size, b'\x00') (value,) = struct.unpack(type_code, packed_bytes) return value, new_offset return unpack_type def _decode_map(self, size, offset): container = {} for _ in range(size): (key, offset) = self.decode(offset) (value, offset) = self.decode(offset) container[key] = value return container, offset _pointer_value_offset = { 1: 0, 2: 2048, 3: 526336, 4: 0, } def _decode_pointer(self, size, offset): pointer_size = ((size >> 3) & 0x3) + 1 new_offset = offset + pointer_size pointer_bytes = self._buffer[offset:new_offset] packed = pointer_bytes if pointer_size == 4 else struct.pack( b'!c', byte_from_int(size & 0x7)) + pointer_bytes unpacked = int_from_bytes(packed) pointer = unpacked + self._pointer_base + \ self._pointer_value_offset[pointer_size] if self._pointer_test: return pointer, new_offset (value, _) = self.decode(pointer) return value, new_offset def _decode_uint(self, size, offset): new_offset = offset + size uint_bytes = self._buffer[offset:new_offset] return int_from_bytes(uint_bytes), new_offset def _decode_utf8_string(self, size, offset): new_offset = offset + size return self._buffer[offset:new_offset].decode('utf-8'), new_offset _type_decoder = { 1: _decode_pointer, 2: _decode_utf8_string, 3: _decode_packed_type(b'!d', 8), # double, 4: _decode_bytes, 5: _decode_uint, # uint16 6: _decode_uint, # uint32 7: _decode_map, 8: _decode_packed_type(b'!i', 4, pad=True), # int32 9: _decode_uint, # uint64 10: _decode_uint, # uint128 11: _decode_array, 14: _decode_boolean, 15: _decode_packed_type(b'!f', 4), # float, } def decode(self, offset): """Decode a section of the data section starting at offset Arguments: offset -- the location of the data structure to decode """ new_offset = offset + 1 (ctrl_byte,) = struct.unpack(b'!B', self._buffer[offset:new_offset]) type_num = ctrl_byte >> 5 # Extended type if not type_num: (type_num, new_offset) = self._read_extended(new_offset) if not type_num in self._type_decoder: raise InvalidDatabaseError('Unexpected type number ({type}) ' 'encountered'.format(type=type_num)) (size, new_offset) = self._size_from_ctrl_byte( ctrl_byte, new_offset, type_num) return self._type_decoder[type_num](self, size, new_offset) def _read_extended(self, offset): (next_byte,) = struct.unpack(b'!B', self._buffer[offset:offset + 1]) type_num = next_byte + 7 if type_num < 7: raise InvalidDatabaseError( 'Something went horribly wrong in the decoder. An ' 'extended type resolved to a type number < 8 ' '({type})'.format(type=type_num)) return type_num, offset + 1 def _verify_size(self, expected, actual): if expected != actual: raise InvalidDatabaseError( 'The MaxMind DB file\'s data section contains bad data ' '(unknown data type or corrupt data)' ) def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num): size = ctrl_byte & 0x1f if type_num == 1: return size, offset bytes_to_read = 0 if size < 29 else size - 28 new_offset = offset + bytes_to_read size_bytes = self._buffer[offset:new_offset] # Using unpack rather than int_from_bytes as it is about 200 lookups # per second faster here. if size == 29: size = 29 + struct.unpack(b'!B', size_bytes)[0] elif size == 30: size = 285 + struct.unpack(b'!H', size_bytes)[0] elif size > 30: size = struct.unpack( b'!I', size_bytes.rjust(4, b'\x00'))[0] + 65821 return size, new_offset