Source code for mrcrowbar.bits

from mrcrowbar.common import is_bytes


BYTE_REVERSE = bytes.fromhex( '008040c020a060e0109050d030b070f0'\
                              '088848c828a868e8189858d838b878f8'\
                              '048444c424a464e4149454d434b474f4'\
                              '0c8c4ccc2cac6cec1c9c5cdc3cbc7cfc'\
                              '028242c222a262e2129252d232b272f2'\
                              '0a8a4aca2aaa6aea1a9a5ada3aba7afa'\
                              '068646c626a666e6169656d636b676f6'\
                              '0e8e4ece2eae6eee1e9e5ede3ebe7efe'\
                              '018141c121a161e1119151d131b171f1'\
                              '098949c929a969e9199959d939b979f9'\
                              '058545c525a565e5159555d535b575f5'\
                              '0d8d4dcd2dad6ded1d9d5ddd3dbd7dfd'\
                              '038343c323a363e3139353d333b373f3'\
                              '0b8b4bcb2bab6beb1b9b5bdb3bbb7bfb'\
                              '078747c727a767e7179757d737b777f7'\
                              '0f8f4fcf2faf6fef1f9f5fdf3fbf7fff' )

BIT_MASK = [(1 << size) - 1 for size in range( 0, 65 )]

mask = lambda size: BIT_MASK[size] if size in range( 0, 65 ) else (1 << size) - 1

[docs] def reverse_bits( number, size=8 ): number &= mask( size ) if size == 8: return BYTE_REVERSE[number] result = 0 width = (size + 7) // 8 shift = (8 - size) % 8 for i in range( 0, width ): fragment = (number >> (i * 8)) & 0xff result |= BYTE_REVERSE[fragment] << ((width - i - 1) * 8) result >>= shift return result
[docs] def read_bits( buffer, byte_offset, bit_offset, size, bytes_reverse=False, bit_endian='big', io_endian='big' ): byte_start = byte_offset bit_start = bit_offset bit_diff = (bit_offset + size) if bit_endian == 'big' else (7 - bit_offset + size) if bytes_reverse: byte_end = byte_offset - bit_diff // 8 else: byte_end = byte_offset + bit_diff // 8 bit_end = bit_diff % 8 if bit_endian == 'big' else 7 - (bit_diff % 8) result = 0 first_byte = buffer[byte_start] middle_bytes = range( byte_start + 1, byte_end ) if not bytes_reverse else range( byte_start - 1, byte_end, -1 ) end_byte = buffer[byte_end] if byte_end in range( len( buffer ) ) else 0 if bit_endian == 'big': # start span_mask = mask( 8 - bit_start ) if byte_start == byte_end: span_mask ^= mask( 8 - bit_end ) result |= first_byte & span_mask if byte_start != byte_end: # middle for i in middle_bytes: result <<= 8 result |= buffer[i] # end span_mask = 0xff ^ mask( 8 - bit_end ) result <<= 8 result |= end_byte & span_mask result >>= 8 - bit_end else: # start span_mask = 0xff ^ mask( 7 - bit_start ) if byte_start == byte_end: span_mask ^= 0xff ^ mask( 7 - bit_end ) result |= first_byte & span_mask result >>= 7 - bit_start if byte_start != byte_end: bit_offset = bit_start + 1 # middle for i in middle_bytes: result |= buffer[i] << bit_offset bit_offset += 8 # end span_mask = mask( 7 - bit_end ) result |= (end_byte & span_mask) << bit_offset if io_endian != bit_endian: result = reverse_bits( result, size ) return result
[docs] def write_bits( value, buffer, byte_offset, bit_offset, size, bytes_reverse=False, bit_endian='big', io_endian='big' ): if value not in range( 1 << size ): raise ValueError( 'Value {} does not fit into {} bits'.format( value, size ) ) byte_start = byte_offset bit_start = bit_offset if io_endian != bit_endian: value = reverse_bits( value, size ) bit_diff = (bit_offset + size) if bit_endian == 'big' else (7 - bit_offset + size) if bytes_reverse: byte_end = byte_offset - bit_diff // 8 else: byte_end = byte_offset + bit_diff // 8 bit_end = bit_diff % 8 if bit_endian == 'big' else 7 - (bit_diff % 8) middle_bytes = range( byte_start + 1, byte_end ) if not bytes_reverse else range( byte_start - 1, byte_end, -1 ) if bit_endian == 'big': # start span_mask = mask( 8 - bit_start ) if byte_start == byte_end: span_mask ^= mask( 8 - bit_end ) start_value = value << 8 - bit_end else: start_value = value >> size - (8 - bit_start) buffer[byte_start] = (0xff ^ span_mask) & buffer[byte_start] | (start_value & span_mask) if byte_start != byte_end: # middle for i, x in enumerate( middle_bytes ): buffer[x] = (value >> ((len( middle_bytes ) - i - 1) * 8 + bit_end)) & 0xff # end end_value = value << (8 - bit_end) span_mask = 0xff ^ mask( 8 - bit_end ) if span_mask: buffer[byte_end] = (0xff ^ span_mask) & buffer[byte_end] | (end_value & span_mask) else: # start span_mask = 0xff ^ mask( 7 - bit_start ) if byte_start == byte_end: span_mask ^= 0xff ^ mask( 7 - bit_end ) start_value = value << 7 - bit_start buffer[byte_start] = (0xff ^ span_mask) & buffer[byte_start] | (start_value & span_mask) if byte_start != byte_end: bit_offset = bit_start + 1 # middle for i, x in enumerate( middle_bytes ): buffer[x] = (value >> bit_offset) & 0xff bit_offset += 8 # end span_mask = mask( 7 - bit_end ) end_value = value >> bit_offset if span_mask: buffer[byte_end] = (0xff ^ span_mask) & buffer[byte_end] | (end_value & span_mask) return
[docs] def reverse_bytes( buffer ): return bytes( reversed( map( buffer, reverse_bits ) ) )
[docs] def unpack_bits( byte ): """Expand a bitfield into a 64-bit int (8 bool bytes).""" longbits = byte & (0x00000000000000ff) longbits = (longbits | (longbits<<28)) & (0x0000000f0000000f) longbits = (longbits | (longbits<<14)) & (0x0003000300030003) longbits = (longbits | (longbits<<7)) & (0x0101010101010101) return longbits
[docs] def pack_bits( longbits ): """Crunch a 64-bit int (8 bool bytes) into a bitfield.""" byte = longbits & (0x0101010101010101) byte = (byte | (byte>>7)) & (0x0003000300030003) byte = (byte | (byte>>14)) & (0x0000000f0000000f) byte = (byte | (byte>>28)) & (0x00000000000000ff) return byte
[docs] class BitStream( object ): def __init__( self, buffer=None, start_offset=None, bytes_reverse=False, bit_endian='big', io_endian='big' ): """Create a BitStream instance. buffer Target byte array to read/write from. Defaults to an empty array. start_offset Position in the target to start reading from. Can be an integer byte offset, or a tuple containing the byte and bit offsets. Defaults to the start of the stream, depending on the endianness and ordering options. bytes_reverse If enabled, fetch successive bytes from the source in reverse order. bit_endian Endianness of the backing storage; either 'big' or 'little'. Defaults to big (i.e. starting from the most-significant bit (0x80) through least-significant bit (0x10)). io_endian Endianness of data returned from read/write; either 'big' or 'little'. Defaults to big (i.e. starting from the most-significant bit (0x80) through least-significant bit (0x10)). """ if buffer is None: self.buffer = bytearray() else: assert is_bytes( buffer ) self.buffer = buffer self.bytes_reverse = bytes_reverse if bit_endian not in ('big', 'little'): raise TypeError( 'bit_endian should be either \'big\' or \'little\'' ) self.bit_endian = bit_endian if io_endian not in ('big', 'little'): raise TypeError( 'io_endian should be either \'big\' or \'little\'' ) self.io_endian = io_endian if start_offset is None: self.byte_pos = len( buffer ) - 1 if bytes_reverse else 0 self.bit_pos = 0 if bit_endian == 'big' else 7 elif isinstance( start_offset, int ): self.byte_pos = start_offset self.bit_pos = 0 if bit_endian == 'big' else 7 elif isinstance( start_offset, tuple ): self.byte_pos, self.bit_pos = start_offset else: raise TypeError('start_offset should be of type int or tuple')
[docs] def tell( self ): """Get the current byte and bit position.""" return self.byte_pos, self.bit_pos
[docs] def read( self, count ): """Get an integer containing the next [count] bits from the source.""" """ x.read( 3 ) # 0bABC x.read( 3 ) # 0bDEF x.read( 3 ) # 0bGHI x.read( 3 ) # 0bJKL # default: # ABCDEFGH IJKLxxxx # bit_endian == 'little' # HGFEDCBA xxxxLKJI # bytes_reverse == True: # IJKLxxxx ABCDEFGH # io_endian == 'little': # CBAFEDIH GLKJxxxx """ result = read_bits( buffer=self.buffer, byte_offset=self.byte_pos, bit_offset=self.bit_pos, size=count, bytes_reverse=self.bytes_reverse, bit_endian=self.bit_endian, io_endian=self.io_endian ) self.seek( (count // 8, count % 8), origin="current" ) return result
[docs] def write( self, value, count ): """Write an unsigned integer containing [count] bits to the source.""" """ x.write( 0bABC, 3 ) x.write( 0bDEF, 3 ) x.write( 0bGHI, 3 ) x.write( 0bJKL, 3 ) # default: # ABCDEFGH IJKLxxxx # bit_endian == 'little' # HGFEDCBA xxxxLKJI # bytes_reverse == True: # IJKLxxxx ABCDEFGH # io_endian == 'little': # CBAFEDIH GLKJxxxx """ bit_diff = (self.bit_pos + count - 1) if self.bit_endian == 'big' else (7 - self.bit_pos + count - 1) new_byte_pos = self.byte_pos if self.bytes_reverse: new_byte_pos -= bit_diff // 8 else: new_byte_pos += bit_diff // 8 if new_byte_pos < 0: byte_count = -new_byte_pos self.buffer = bytearray( b'\x00'*byte_count ) + self.buffer self.byte_pos += byte_count elif new_byte_pos >= len( self.buffer ): byte_count = new_byte_pos - len( self.buffer ) + 1 self.buffer = self.buffer + bytearray( b'\x00'*byte_count ) write_bits( value=value, buffer=self.buffer, byte_offset=self.byte_pos, bit_offset=self.bit_pos, size=count, bytes_reverse=self.bytes_reverse, bit_endian=self.bit_endian, io_endian=self.io_endian ) self.seek( (count // 8, count % 8), origin="current" )
[docs] def seek( self, offset, origin="start" ): """Seek to a location in the target. offset Relative offset in the target to move to. Can be an integer byte offset, or a tuple containing the byte and bit offsets. origin Position to measure the offset from. Can be either "start", "current" or "end". Defaults to "start". """ if isinstance( offset, int ): count = offset * 8 elif isinstance( offset, tuple ): count = offset[0] * 8 + offset[1] else: raise TypeError("offset should be of type int or tuple") if origin not in ("start", "current", "end"): raise TypeError('origin should be one of "start", "current" or "end"') if origin in ("start", "end"): if (origin == "start") ^ (self.bytes_reverse == False): self.byte_pos = len( self.buffer ) self.bit_pos = 0 else: self.byte_pos = 0 self.bit_pos = 0 bit_diff = (self.bit_pos + count) if self.bit_endian == 'big' else (7 - self.bit_pos + count) if self.bytes_reverse: self.byte_pos -= bit_diff // 8 else: self.byte_pos += bit_diff // 8 self.bit_pos = bit_diff % 8 if self.bit_endian == 'big' else 7 - (bit_diff % 8)
[docs] def in_bounds( self ): """Returns True if the current position is within the bounds of the target.""" return self.byte_pos in range( len( self.buffer ) )
[docs] def get_buffer( self ): """Return a byte string containing the target.""" return bytes( self.buffer )