from mrcrowbar import models as mrc
from mrcrowbar import utils, bits
[docs]
class AECompressor( mrc.Transform ):
[docs]
def import_data( self, buffer ):
unk1 = buffer[0]
flags = buffer[1]
rle_c = RLECompressor()
dict_c = DictCompressor()
pointer = 2
if (flags & 2 != 0) and (flags & 1 != 0):
pass1 = dict_c.import_data( buffer[2:] )
pass2 = rle_c.import_data( pass1.payload )
return mrc.TransformResult( payload=pass2.payload, end_offset=pass1.end_offset+2 )
elif (flags & 2 != 0) and (flags & 1 == 0):
pass1 = dict_c.import_data( buffer[2:] )
return mrc.TransformResult( payload=pass1.payload, end_offset=pass1.end_offset+2 )
elif (flags & 2 == 0) and (flags & 1 != 0):
pass1 = rle_c.import_data( buffer[2:] )
return mrc.TransformResult( payload=pass1.payload, end_offset=pass1.end_offset+2 )
# no compression
return mrc.TransformResult( payload=buffer[2:], end_offset=len( buffer ) )
[docs]
class RLECompressor( mrc.Transform ):
[docs]
def import_data( self, buffer, parent=None ):
pointer = 0
dest = bytearray()
while pointer < len( buffer ):
test = utils.from_int8( buffer[pointer:pointer+1] )
pointer += 1
if test > 0:
dest.extend( buffer[pointer:pointer+test] )
pointer += test
else:
count = 1-test
al = buffer[pointer]
pointer += 1
for i in range( count ):
dest.append( al )
return mrc.TransformResult( payload=bytes( dest ), end_offset=len( buffer ) )
[docs]
class DictCompressor( mrc.Transform ):
[docs]
def import_data( self, buffer, parent=None ):
src = bytearray( buffer )
dest = bytearray()
lookup_pointer = 0
bit_size = 9
bs = bits.BitStream( src, start_offset=2, bit_endian='big', io_endian='big' )
eof = False
loop = 0
while True:
if loop % 2 == 0:
src[lookup_pointer:lookup_pointer+2] = utils.to_uint16_le( len( dest ) )
lookup_pointer += 2
while True:
test = 0
try:
test = bs.read( bit_size )
except IndexError:
eof = True
break
if test != 0x100:
break
bit_size += 1
if eof:
break
if test <= 0xff:
dest.append( test & 0xff )
else:
index = (test - 0x101) << 1
if index > len( src ):
print( 'Out of bounds! 0x{:04x}'.format(index) )
break
start = utils.from_uint16_le( src[index:index+2] )
end = utils.from_uint16_le( src[index+2:index+4] )
dest.extend( dest[start:end] )
loop += 1
return mrc.TransformResult( payload=bytes( dest ), end_offset=len( buffer ) )
[docs]
def offset_stream_end( buffer, offset ):
return offset >= utils.from_uint32_le( buffer[0:4] )
[docs]
class AEDAT( mrc.Block ):
offsets = mrc.UInt32_LE( 0x00, stream=True, stop_check=offset_stream_end )
raw_data = mrc.Bytes()
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.data = mrc.LinearStore( self, source=mrc.Ref( 'raw_data' ),
offsets=mrc.Ref( 'offsets' ),
base_offset=mrc.EndOffset( 'offsets', neg=True ),
block_klass=mrc.Unknown )