"""Definition classes for data blocks."""
from collections import OrderedDict
import logging
logger = logging.getLogger( __name__ )
from mrcrowbar.fields import Field, Bytes
from mrcrowbar.refs import Ref
from mrcrowbar.checks import Check
from mrcrowbar import common, utils
[docs]
class FieldDescriptor( object ):
def __init__( self, name ):
"""Attribute wrapper class for Fields.
name
Name of the Field.
"""
self.name = name
def __get__( self, instance, cls ):
try:
if instance is None:
return cls._fields[self.name]
return instance._field_data[self.name]
except KeyError:
raise AttributeError( self.name )
def __set__( self, instance, value ):
if instance is None:
return
instance._field_data[self.name] = value
return
def __delete__( self, instance ):
if self.name not in instance._fields:
raise AttributeError( "{} has no attribute {}".format(
type( instance ).__name__, self.name ) )
del instance._fields[self.name]
[docs]
class RefDescriptor( object ):
def __init__( self, name ):
"""Attribute wrapper class for Refs.
name
Name of the Ref.
"""
self.name = name
def __get__( self, instance, cls ):
try:
if instance is None:
return cls._refs[self.name]
# FIXME: don't cache until we evaluate if the performance suffers
if True: #self.name not in instance._ref_cache:
instance._ref_cache[self.name] = instance._refs[self.name].get( instance )
return instance._ref_cache[self.name]
except KeyError:
raise AttributeError( self.name )
def __set__( self, instance, value ):
if instance is None:
return
instance._refs[self.name].set( instance, value )
return
def __delete__( self, instance ):
raise AttributeError( "can't delete Ref" )
[docs]
class Block( object, metaclass=BlockMeta ):
_parent = None
_endian = None
_cache_bytes = False
_bytes = None
def __init__( self, source_data=None, parent=None, preload_attrs=None, endian=None, cache_bytes=False, path_hint=None ):
"""Base class for Blocks.
source_data
Source data to construct Block with. Can be a byte string, dictionary
of attribute: value pairs, or another Block object.
parent
Parent Block object where this Block is defined. Used for e.g.
evaluating Refs.
preload_attrs
Attributes on the Block to set before importing the data. Used
for linking in dependencies before loading.
endian
Platform endianness to use when interpreting the Block data.
Useful for Blocks which have the same data layout but different
endianness for stored numbers. Has no effect on fields with an
predefined endianness.
cache_bytes
Cache the bytes equivalent of the Block. Useful for debugging the
loading procedure. Defaults to False.
path_hint
Cache a string containing the path of the current Block, relative
to the root.
"""
self._field_data = {}
self._ref_cache = {}
if parent is not None:
assert isinstance( parent, Block )
self._parent = parent
self._endian = endian if endian else (parent._endian if parent else self._endian)
self._path_hint = path_hint
if self._path_hint is None:
self._path_hint = '<{}>'.format( self.__class__.__name__ )
if cache_bytes:
self._cache_bytes = True
if source_data and common.is_bytes( source_data ):
self._bytes = bytes( source_data )
if preload_attrs:
for attr, value in preload_attrs.items():
setattr( self, attr, value )
# start the initial load of data
if isinstance( source_data, Block ):
self.clone_data( source_data )
elif isinstance( source_data, dict ):
# preload defaults, then overwrite with dictionary values
self.import_data( None )
self.update_data( source_data )
else:
self.import_data( source_data )
# cache all refs
for key, ref in self._refs.items():
ref.cache( self )
def __repr__( self ):
desc = '0x{:016x}'.format( id( self ) )
if hasattr( self, 'repr' ) and isinstance( self.repr, str ):
desc = self.repr
return '<{}: {}>'.format( self.__class__.__name__, desc )
@property
def repr( self ):
"""Plaintext summary of the Block."""
return None
@property
def serialised( self ):
"""Tuple containing the contents of the Block."""
klass = self.__class__
return ((klass.__module__, klass.__name__), tuple( (name, field.serialise( self._field_data[name], parent=self ) ) for name, field in klass._fields.items()))
[docs]
def clone_data( self, source ):
"""Clone data from another Block.
source
Block instance to copy from.
"""
klass = self.__class__
assert isinstance( source, klass )
for name in klass._fields:
self._field_data[name] = getattr( source, name )
[docs]
def update_data( self, source ):
"""Update data from a dictionary.
source
Dictionary of attribute: value pairs.
"""
assert isinstance( source, dict )
for attr, value in source.items():
assert hasattr( self, attr )
setattr( self, attr, value )
[docs]
def import_data( self, raw_buffer ):
"""Import data from a byte array.
raw_buffer
Byte array to import from.
"""
klass = self.__class__
if raw_buffer is not None:
assert common.is_bytes( raw_buffer )
# raw_buffer = memoryview( raw_buffer )
self._field_data = {}
logger.debug( '{}: loading fields'.format( self ) )
for name in klass._fields:
if raw_buffer is not None:
if logger.isEnabledFor( logging.DEBUG ):
logger.debug( '{} [{}]: input buffer'.format( name, klass._fields[name] ) )
self._field_data[name] = klass._fields[name].get_from_buffer(
raw_buffer, parent=self
)
if logger.isEnabledFor( logging.DEBUG ):
logger.debug( 'Result for {} [{}]: {}'.format( name, klass._fields[name], self._field_data[name] ) )
else:
self._field_data[name] = klass._fields[name].default
if raw_buffer is not None:
for name, check in klass._checks.items():
check.check_buffer( raw_buffer, parent=self )
# if we have debug logging on, check the roundtrip works
if logger.isEnabledFor( logging.INFO ):
test = self.export_data()
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug( 'Stats for {}:'.format( self ) )
logger.debug( 'Import buffer size: {}'.format( len( raw_buffer ) ) )
logger.debug( 'Export size: {}'.format( len( test ) ) )
if test == raw_buffer:
logger.debug( 'Content: exact match!' )
elif test == raw_buffer[:len( test )]:
logger.debug( 'Content: exact match with overflow!' )
else:
logger.debug( 'Content: different!' )
for x in utils.hexdump_diff_iter( raw_buffer[:len( test )], test ):
logger.debug( x )
elif test != raw_buffer[:len( test )]:
logger.info( '{} export produced changed output from import'.format( self ) )
# if raw_buffer:
# raw_buffer.release()
return
load = import_data
[docs]
def export_data( self ):
"""Export data to a byte array."""
klass = self.__class__
output = bytearray( b'\x00'*self.get_size() )
# prevalidate all data before export.
# this is important to ensure that any dependent fields
# are updated beforehand, e.g. a count referenced
# in a BlockField
queue = []
for name in klass._fields:
self.scrub_field( name )
self.validate_field( name )
self.update_deps()
for name in klass._fields:
klass._fields[name].update_buffer_with_value(
self._field_data[name], output, parent=self
)
for name, check in klass._checks.items():
check.update_buffer( output, parent=self )
return output
dump = export_data
[docs]
def update_deps( self ):
"""Update dependencies on all the fields on this Block instance."""
klass = self.__class__
for name in klass._fields:
self.update_deps_on_field( name )
return
[docs]
def validate( self ):
"""Validate all the fields on this Block instance."""
klass = self.__class__
for name in klass._fields:
self.validate_field( name )
return
[docs]
def get_size( self ):
"""Get the projected size (in bytes) of the exported data from this Block instance."""
klass = self.__class__
size = 0
for name in klass._fields:
size = max( size, klass._fields[name].get_end_offset( self._field_data[name], parent=self ) )
for check in klass._checks.values():
size = max( size, check.get_end_offset( parent=self ) )
return size
[docs]
def get_field_obj( self, field_name ):
"""Return a Field object associated with this Block class.
field_name
Name of the Field.
"""
klass = self.__class__
return klass._fields[field_name]
[docs]
def get_field_names( self ):
"""Get the list of Fields associated with this Block class."""
klass = self.__class__
return klass._fields.keys()
[docs]
def get_field_path( self, field ):
"""Return the path of this Block and a child Field in the current object tree.
Used for error messages.
field
Field object on the object to reference.
"""
klass = self.__class__
for field_name, field_obj in klass._fields.items():
if field_obj == field:
return '{}.{}'.format( self.get_path(), field_name )
return '{}.?'.format( self.get_path() )
[docs]
def get_path( self ):
"""Return the path of this Block in the current object tree.
Used for error messages."""
klass = self.__class__
if not self._parent:
self._path_hint = '<{}>'.format( klass.__name__ )
else:
pklass = self._parent.__class__
for field_name, field_obj in pklass._fields.items():
if field_name in self._parent._field_data:
if self._parent._field_data[field_name] == self:
self._path_hint = '{}.{}'.format( self._parent.get_path(), field_name )
elif type( self._parent._field_data[field_name] ) == list:
for i, subobject in enumerate( self._parent._field_data[field_name] ):
if subobject == self:
self._path_hint = '{}.{}[{}]'.format( self._parent.get_path(), field_name, i )
elif hasattr( subobject, 'obj' ) and subobject.obj == self:
self._path_hint = '{}.{}[{}].obj'.format( self._parent.get_path(), field_name, i )
return self._path_hint
[docs]
def get_field_start_offset( self, field_name, index=None ):
"""Return the start offset of where a Field's data is to be stored in the Block.
field_name
Name of the Field to inspect.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
klass = self.__class__
return klass._fields[field_name].get_start_offset( self._field_data[field_name], parent=self, index=index )
[docs]
def get_field_size( self, field_name, index=None ):
"""Return the size of a Field's data (in bytes).
field_name
Name of the Field to inspect.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
klass = self.__class__
return klass._fields[field_name].get_size( self._field_data[field_name], parent=self, index=index )
[docs]
def get_field_end_offset( self, field_name, index=None ):
"""Return the end offset of a Field's data. Useful for chainloading.
field_name
Name of the Field to inspect.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
klass = self.__class__
return klass._fields[field_name].get_end_offset( self._field_data[field_name], parent=self, index=index )
[docs]
def scrub_field( self, field_name ):
"""Return a Field's data coerced to the correct type (if necessary).
field_name
Name of the Field to inspect.
Throws FieldValidationError if value can't be coerced.
"""
klass = self.__class__
self._field_data[field_name] = klass._fields[field_name].scrub( self._field_data[field_name], parent=self )
return self._field_data[field_name]
[docs]
def update_deps_on_field( self, field_name ):
"""Update all dependent variables derived from the value of a Field.
field_name
Name of the Field to inspect.
"""
klass = self.__class__
return klass._fields[field_name].update_deps( self._field_data[field_name], parent=self )
[docs]
def validate_field( self, field_name ):
"""Validate that a correctly-typed Python object meets the constraints for a Field.
field_name
Name of the Field to inspect.
Throws FieldValidationError if a constraint fails.
"""
klass = self.__class__
return klass._fields[field_name].validate( self._field_data[field_name], parent=self )
[docs]
def hexdump( self, start=None, end=None, length=None, major_len=8, minor_len=4, colour=True, address_base=None, show_offsets=True, show_glyphs=True ):
"""Print the exported data in tabular hexadecimal/ASCII format.
start
Start offset to read from (default: start)
end
End offset to stop reading at (default: end)
length
Length to read in (optional replacement for end)
major_len
Number of hexadecimal groups per line
minor_len
Number of bytes per hexadecimal group
colour
Add ANSI colour formatting to output (default: true)
address_base
Base address to use for labels (default: start)
show_offsets
Display offsets at the start of each line (default: true)
show_glyphs
Display glyph map at the end of each line (default: true)
Raises ValueError if both end and length are defined.
"""
utils.hexdump(
self.export_data(), start=start, end=end,
length=length, major_len=major_len, minor_len=minor_len,
colour=colour, address_base=address_base,
show_offsets=show_offsets, show_glyphs=show_glyphs
)
[docs]
def histdump( self, start=None, end=None, length=None, samples=0x10000, width=64, address_base=None ):
"""Print the histogram of the exported data.
start
Start offset to read from (default: start)
end
End offset to stop reading at (default: end)
length
Length to read in (optional replacement for end)
samples
Number of samples per histogram slice (default: 0x10000)
width
Width of rendered histogram (default: 64)
address_base
Base address to use for labelling (default: start)
"""
utils.histdump(
self.export_data(), start=start, end=end, length=length,
samples=samples, width=width, address_base=address_base
)
[docs]
def search( self, pattern, encoding='utf8', fixed_string=False, hex_format=False, ignore_case=False ):
"""Find the Fields that match a byte pattern.
pattern
Pattern to match, as a Python string
encoding
Convert strings in the pattern to a specific Python encoding (default: utf8)
fixed_string
Interpret the pattern as a fixed string (disable regular expressions)
hex_format
Interpret the pattern as raw hexidecimal (default: false)
ignore_case
Perform a case-insensitive search
"""
return [x for x in utils.search_iter(
pattern, source, prefix='<{}>'.format( self.__class__.__name__ ),
depth=None, encoding=encoding, fixed_string=fixed_string,
hex_format=hex_format, ignore_cast=ignore_case
)]
[docs]
class Unknown( Block ):
"""Placeholder block for data of an unknown format."""
#: Raw data.
data = Bytes( 0x0000 )