"""File format classes for the Commander Keen: Invasion of the Vorticons engine (DOS, 1991)
Sources:
RLE compressor
http://www.shikadi.net/moddingwiki/Keen_1-3_RLE_compression
RLEW compressor
http://www.shikadi.net/moddingwiki/RLEW_compression
LZW compressor
http://www.shikadi.net/moddingwiki/LZW_Compression
(Special thanks to Fleexy)
EGA header
http://www.shikadi.net/moddingwiki/Commander_Keen_EGA_Header
Level format
http://www.shikadi.net/moddingwiki/Commander_Keen_1-3_Level_format
"""
import itertools
import logging
logger = logging.getLogger( __name__ )
from mrcrowbar import models as mrc
from mrcrowbar.lib.hardware import ibm_pc
from mrcrowbar.lib.images import base as img
from mrcrowbar import bits, utils
[docs]
class RLECompressor( mrc.Transform ):
[docs]
def import_data( self, buffer, parent=None ):
final_length = utils.from_uint32_le( buffer[0:4] )
i = 4
out = bytearray()
while (len( out ) < final_length):
byte = buffer[i]
if byte >= 128:
out.extend( buffer[i+1:i+byte-126] )
i += byte-126
else:
out.extend( buffer[i+1:i+2]*(byte+3) )
i += 2
return mrc.TransformResult( payload=bytes( out ), end_offset=i )
[docs]
class RLEWCompressor( mrc.Transform ):
[docs]
def import_data( self, buffer, parent=None ):
final_length = utils.from_uint32_le( buffer[0:4] )
i = 4
out = bytearray()
while (len( out ) < final_length):
word = buffer[i:i+2]
if word == b'\xfe\xfe':
count = utils.from_uint16_le( buffer[i+2:i+4] )
data = buffer[i+4:i+6]
out.extend( data*count )
i += 6
else:
out.extend( word )
i += 2
return mrc.TransformResult( payload=bytes( out ), end_offset=i )
[docs]
class LZWCompressor( mrc.Transform ):
[docs]
def import_data( self, buffer, parent=None ):
decomp_size = utils.from_uint32_le( buffer[:4] )
max_bits = utils.from_uint16_le( buffer[4:6] ) # should be 12
lookup = [bytes((i,)) for i in range( 256 )]
lookup.append( None ) # 256: error
lookup.append( None ) # 257: end of data
output = bytearray()
bs = bits.BitStream( buffer, 6, bit_endian='big', io_endian='big' )
state = {'usebits': 9}
def add_to_lookup( state, entry ):
if len( lookup ) < (1 << max_bits):
logger.debug( 'lookup[{}] = {}'.format( len( lookup ), entry ) )
lookup.append( entry )
if len( lookup ) == (1 << state['usebits'])-1:
state['usebits'] = min( state['usebits']+1, max_bits )
logger.debug( 'usebits = {}'.format(state['usebits']) )
return
fcode = bs.read( state['usebits'] )
match = lookup[fcode]
logger.debug( 'fcode={},match={}'.format( fcode, match ) )
output.extend( match )
while True:
ncode = bs.read( state['usebits'] )
logger.debug( 'ncode={}'.format( ncode ) )
if ncode == 257:
# end of data
break
elif ncode == 256:
# error
raise Exception( 'Found error code, data is not valid' )
elif ncode < len( lookup ):
nmatch = lookup[ncode]
else:
nmatch = match+match[0:1]
logger.debug( 'match={}'.format(match) )
logger.debug( 'nmatch={}'.format(nmatch) )
output.extend( nmatch )
# add code to lookup
add_to_lookup( state, match+nmatch[0:1] )
match = nmatch
if len( output ) != decomp_size:
logger.warning( '{}: was expecting data of size {}, got data of size {} instead'.format( self, decomp_size, len( output ) ) )
return mrc.TransformResult( payload=bytes( output ), end_offset=len( buffer ) )
[docs]
class EGATile8( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( '_parent.tile8_size' ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=8, height=8,
source=mrc.Ref( 'image_data' ),
frame_count=mrc.Ref( '_parent._parent._egahead.tile8_count' ),
palette=ibm_pc.EGA_DEFAULT_PALETTE
)
[docs]
class EGATile16( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( '_parent.tile16_size' ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=16, height=16,
source=mrc.Ref( 'image_data' ),
frame_count=mrc.Ref( '_parent._parent._egahead.tile16_count' ),
palette=ibm_pc.EGA_DEFAULT_PALETTE
)
[docs]
class EGATile32( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( '_parent.tile32_size' ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=32, height=32,
source=mrc.Ref( 'image_data' ),
frame_count=mrc.Ref( '_parent._parent._egahead.tile32_count' ),
palette=ibm_pc.EGA_DEFAULT_PALETTE
)
[docs]
class EGATileStore( mrc.Block ):
data = mrc.Bytes( 0x00, transform=img.Planarizer(
bpp=4,
plane_size=mrc.Ref( '_parent._egahead.latch_plane_size' )
) )
tile8 = mrc.StoreRef( EGATile8, mrc.Ref( 'store' ), mrc.Ref( 'tile8_offset' ), mrc.Ref( 'tile8_size' ) )
tile16 = mrc.StoreRef( EGATile16, mrc.Ref( 'store' ), mrc.Ref( 'tile16_offset' ), mrc.Ref( 'tile16_size' ) )
tile32 = mrc.StoreRef( EGATile32, mrc.Ref( 'store' ), mrc.Ref( 'tile32_offset' ), mrc.Ref( 'tile32_size' ) )
def __init__( self, *args, **kwargs ):
self.store = mrc.Store( self, mrc.Ref( 'data' ) )
super().__init__( *args, **kwargs )
@property
def tile8_offset( self ):
return self._parent._egahead.tile8_offset*8
@property
def tile16_offset( self ):
return self._parent._egahead.tile16_offset*8
@property
def tile32_offset( self ):
return self._parent._egahead.tile32_offset*8
@property
def tile8_size( self ):
return self._parent._egahead.tile8_count*8*8
@property
def tile16_size( self ):
return self._parent._egahead.tile16_count*16*16
@property
def tile32_size( self ):
return self._parent._egahead.tile16_count*32*32
[docs]
class EGALatch( mrc.Block ):
_egahead = None
tilestore = mrc.BlockField( EGATileStore, 0x00 )
[docs]
class EGALatchComp( mrc.Block ):
_egahead = None
tilestore = mrc.BlockField( EGATileStore, 0x00, transform=LZWCompressor() )
[docs]
class SoundRef( mrc.Block ):
offset = mrc.UInt16_LE( 0x00 )
priority = mrc.UInt8( 0x02 )
rate = mrc.UInt8( 0x03 )
name = mrc.Bytes( 0x04, length=0x0c )
[docs]
class PreviewCompressor( mrc.Transform ):
rle = RLECompressor()
# each plane is stored with 192 bytes padding at the end
plan = img.Planarizer( bpp=4, width=320, height=200, plane_padding=192 )
[docs]
def import_data( self, buffer, parent=None ):
assert utils.is_bytes( buffer )
stage_1 = self.rle.import_data( buffer )
stage_2 = self.plan.import_data( stage_1.payload )
return mrc.TransformResult( payload=stage_2.payload, end_offset=stage_1.end_offset )
[docs]
class Preview( mrc.Block ):
image_data = mrc.Bytes( 0x0000, transform=PreviewCompressor() )
def __init__( self, *args, **kwargs ):
mrc.Block.__init__( self, *args, **kwargs )
self.image = img.IndexedImage( self, width=320, height=200, palette=ibm_pc.EGA_DEFAULT_PALETTE, source=mrc.Ref( 'image_data' ) )
[docs]
class Level( mrc.Unknown ):
pass
[docs]
class LevelTile( mrc.Block ):
tile_id = mrc.UInt16_LE( 0x00 )
[docs]
class ScoresItems( mrc.Block ):
joystick = mrc.UInt16_LE( 0x00, range=range( 0, 1 ) )
battery = mrc.UInt16_LE( 0x02, range=range( 0, 1 ) )
vacuum = mrc.UInt16_LE( 0x04, range=range( 0, 1 ) )
liquor = mrc.UInt16_LE( 0x06, range=range( 0, 1 ) )
[docs]
class ScoresName( mrc.Block ):
name = mrc.Bytes( 0x00, length=13 )
[docs]
class Scores( mrc.Block ):
values = mrc.UInt32_LE( 0x00, length=7 )
items = mrc.BlockField( ScoresItems, 0x1c, count=7 )
num_cities = mrc.UInt16_LE( 0x54, length=7, range=range( 0, 9 ) )
unknown_1 = mrc.Bytes( 0x62, length=14 )
names = mrc.BlockField( ScoresName, 0x70, count=7 )
term = mrc.Const( mrc.Bytes( 0xcb, length=1 ), b'\x00' )
[docs]
class Loader( mrc.Loader ):
_SEP = mrc.Loader._SEP
_KEEN_FILE_CLASS_MAP = {
_SEP+'(EGAHEAD).CK([1-3])$': EGAHeader,
_SEP+'(EGALATCH).CK(1)$': EGALatchComp,
_SEP+'(EGALATCH).CK([2-3])$': EGALatch,
_SEP+'(FINALE).CK([1-3])$': Preview,
_SEP+'(LEVEL)([0-9]{2}).CK([1-3])$': Level,
_SEP+'(PREVIEW)([2-3]).CK([1-3])$': Preview,
_SEP+'(SCORES).CK([1-3])$': Scores,
}
_KEEN_DEPS = [
(_SEP+'(EGALATCH).CK([1-3])$', _SEP+'(EGAHEAD).CK([1-3])$', ('EGAHEAD', '{1}'), '_egahead')
]
def __init__( self ):
super().__init__( self._KEEN_FILE_CLASS_MAP, self._KEEN_DEPS )
[docs]
def post_load( self ):
pass