"""Definition classes for common fields in binary formats."""
import collections
import math
import logging
logger = logging.getLogger( __name__ )
from mrcrowbar.refs import Ref, Chain, property_get, property_set
from mrcrowbar import common, encoding
[docs]
class ParseError( Exception ):
pass
[docs]
class FieldValidationError( Exception ):
pass
[docs]
class EmptyFieldError( Exception ):
pass
[docs]
class Field( object ):
def __init__( self, *, default=None, **kwargs ):
"""Base class for Fields.
default
Default value to emit in the case of e.g. creating an empty Block.
"""
self._position_hint = next( common.next_position_hint )
self.default = default
def __repr__( self ):
desc = '0x{:016x}'.format( id( self ) )
if hasattr( self, 'repr' ) and isinstance( self.repr, str ):
desc = self.repr
return '<{}: {}>'.format( self.__class__.__name__, desc )
@property
def repr( self ):
"""Plaintext summary of the Field."""
return None
@property
def serialised( self ):
"""Tuple containing the contents of the Field."""
return None
def __hash__( self ):
serial = self.serialised
if serial is None:
return super().__hash__()
return hash( self.serialised )
def __eq__( self, other ):
serial = self.serialised
if serial is None:
return super().__eq__( other )
return self.serialised == other.serialised
[docs]
def get_from_buffer( self, buffer, parent=None ):
"""Create a Python object from a byte string, using the field definition.
buffer
Input byte string to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
"""
return None
[docs]
def update_buffer_with_value( self, value, buffer, parent=None ):
"""Write a Python object into a byte array, using the field definition.
value
Input Python object to process.
buffer
Output byte array to encode value into.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
"""
assert common.is_bytes( buffer )
self.validate( value, parent )
return
[docs]
def get_start_offset( self, value, parent=None, index=None ):
"""Return the start offset of where the Field's data is to be stored in the Block.
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
assert index is None
return 0
[docs]
def get_size( self, value, parent=None, index=None ):
"""Return the size of the Field's data (in bytes).
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
assert index is None
return 0
[docs]
def get_end_offset( self, value, parent=None, index=None ):
"""Return the end offset of the Field's data. Useful for chainloading.
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
index
Index of the Python object to measure from. Used if the Field
takes a list of objects.
"""
return self.get_start_offset( value, parent, index ) + self.get_size( value, parent, index )
[docs]
def scrub( self, value, parent=None ):
"""Return the value coerced to the correct type of the Field (if necessary).
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
Throws FieldValidationError if value can't be coerced.
"""
return value
[docs]
def update_deps( self, value, parent=None ):
"""Update all dependent variables derived from the value of the Field.
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
"""
return
[docs]
def validate( self, value, parent=None ):
"""Validate that a correctly-typed Python object meets the constraints for the Field.
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
Throws FieldValidationError if a constraint fails.
"""
pass
[docs]
def serialise( self, value, parent=None ):
"""Return a value as basic Python types.
value
Input Python object to process.
parent
Parent block object where this Field is defined. Used for e.g.
evaluating Refs.
"""
return None
[docs]
def get_path( self, parent=None, index=None ):
"""Return the location in the Block tree
parent
Parent block object where this Field is defined.
index
Index into the value of the Field.
"""
suffix = '[{}]'.format(index) if index is not None else ''
if not parent:
return '<{}>'.format( self.__class__.__name__ ) + suffix
return parent.get_field_path( self ) + suffix
[docs]
class StreamField( Field ):
def __init__( self, offset=Chain(), *, default=None, count=None, length=None, stream=False,
alignment=1, stream_end=None, stop_check=None, **kwargs ):
"""Base class for accessing one or more streamable elements.
offset
Position of data, relative to the start of the parent block. Defaults to
the end offset of the previous field.
default
Default value to emit in the case of e.g. creating an empty Block.
count
Load multiple elements. None implies a single value, non-negative
numbers will return a Python list.
length
Maximum size of the buffer to read in.
stream
Read elements continuously until a stop condition is met. Defaults to False.
alignment
Number of bytes to align the start of each element to.
stream_end
Byte pattern to denote the end of the stream.
stop_check
A function that takes a data buffer and an offset; should return True if
the end of the data stream has been reached and False otherwise.
"""
if count is not None and default is None:
default = []
super().__init__( default=default, **kwargs )
self.offset = offset
self.count = count
self.length = length
self.stream = stream
self.alignment = alignment
if stream_end is not None:
assert common.is_bytes( stream_end )
self.stream_end = stream_end
self.stop_check = stop_check
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
pass
[docs]
def get_from_buffer( self, buffer, parent=None ):
assert common.is_bytes( buffer )
offset = property_get( self.offset, parent, caller=self )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
alignment = property_get( self.alignment, parent )
is_array = stream or (count is not None)
count = count if is_array else 1
if count is not None:
assert count >= 0
length = property_get( self.length, parent )
if length is not None:
buffer = buffer[:offset+length]
pointer = offset
result = []
while pointer < len( buffer ):
start_offset = pointer
# stop if we've hit the maximum number of items
if not stream and (len( result ) == count):
break
# run the stop check (if exists): if it returns true, we've hit the end of the stream
if self.stop_check and (self.stop_check( buffer, pointer )):
break
# stop if we find the end of stream marker
if self.stream_end is not None and buffer[pointer:pointer+len( self.stream_end )] == self.stream_end:
break
element, end_offset = self.get_element_from_buffer( pointer, buffer, parent, index=len( result ) if is_array else None )
result.append( element )
pointer = end_offset
# if an alignment is set, do some aligning
if alignment is not None:
width = (pointer-start_offset) % alignment
if width:
pointer += alignment - width
if not is_array:
if not result:
# in the case of an empty result for a non-array, attempt to fetch one record.
# this will only work if the resulting element is of size 0.
try:
result, _ = self.get_element_from_buffer( pointer, buffer, parent, index=0 )
except Exception:
raise EmptyFieldError( '{}: No data could be extracted'.format( self.get_path( parent ) ) )
else:
return result[0]
return result
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
pass
[docs]
def update_buffer_with_value( self, value, buffer, parent=None ):
super().update_buffer_with_value( value, buffer, parent )
offset = property_get( self.offset, parent, caller=self )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
alignment = property_get( self.alignment, parent )
is_array = stream or (count is not None)
if is_array:
try:
it = iter( value )
except TypeError:
raise FieldValidationError( '{}: Type {} not iterable'.format( self.get_path( parent ), type( value ) ) )
if not stream:
assert len( value ) <= count
else:
value = [value]
pointer = offset
for index, element in enumerate( value ):
start_offset = pointer
end_offset = self.update_buffer_with_element( pointer, element, buffer, parent, index=index if is_array else None )
pointer = end_offset
if alignment is not None:
width = (pointer-start_offset) % alignment
if width:
pointer += alignment - width
new_size = pointer
if self.stream_end is not None:
new_size += len( self.stream_end )
if len( buffer ) < new_size:
buffer.extend( b'\x00'*(new_size-len( buffer )) )
if self.stream_end is not None:
buffer[new_size-len( self.stream_end ):new_size] = self.stream_end
[docs]
def update_deps( self, value, parent=None ):
count = property_get( self.count, parent )
length = property_get( self.length, parent )
if count is not None and count != len( value ):
property_set( self.count, parent, len( value ) )
target_length = self.get_size( value, parent )
if length is not None and length != target_length:
property_set( self.length, parent, target_length )
[docs]
def validate_element( self, element, parent=None, index=None ):
pass
[docs]
def validate( self, value, parent=None ):
offset = property_get( self.offset, parent, caller=self )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if is_array:
try:
it = iter( value )
except TypeError:
raise FieldValidationError( '{}: Type {} not iterable'.format( self.get_path( parent ), type( value ) ) )
if count is not None and (not isinstance( self.count, Ref )) and (len( value ) != count):
raise FieldValidationError( '{}: Count defined as a constant, was expecting {} list entries but got {}!'.format( self.get_path( parent ), length, len( value ) ) )
else:
value = [value]
for index, element in enumerate( value ):
self.validate_element( element, parent=parent, index=index if is_array else None )
[docs]
def get_element_size( self, element, parent=None, index=None ):
pass
[docs]
def get_start_offset( self, value, parent=None, index=None ):
offset = property_get( self.offset, parent, caller=self )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
alignment = property_get( self.alignment, parent )
is_array = stream or (count is not None)
pointer = offset
if index is not None:
if not is_array:
raise IndexError( '{}: Can\'t use index for a non-array'.format( self.get_path( parent ) ) )
elif index not in range( len( value ) ):
raise IndexError( '{}: Index {} is not within range( 0, {} )'.format( self.get_path( parent ), index, len( value ) ) )
for el_index, element in enumerate( value[:index] ):
start_offset = pointer
pointer += self.get_element_size( element, parent, index=el_index if is_array else None )
# if an alignment is set, do some aligning
if alignment is not None:
width = (pointer-start_offset) % alignment
if width:
pointer += alignment - width
return pointer
[docs]
def get_size( self, value, parent=None, index=None ):
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
alignment = property_get( self.alignment, parent )
is_array = stream or (count is not None)
pointer = 0
if index is not None:
if not is_array:
raise IndexError( '{}: Can\'t use index for a non-array BlockField'.format( self.get_path( parent ) ) )
elif index not in range( 0, len( value ) ):
raise IndexError( '{}: Index {} is not within range( 0, {} )'.format( self.get_path( parent ), index, len( value ) ) )
value = [value[index]]
else:
value = value if is_array else [value]
for el_index, element in enumerate( value ):
start_offset = pointer
pointer += self.get_element_size( element, parent, index=el_index if is_array else None )
# if an alignment is set, do some aligning
if alignment is not None:
width = (pointer-start_offset) % alignment
if width:
pointer += alignment - width
if self.stream_end is not None:
pointer += len( self.stream_end )
return pointer
ChunkBase = collections.namedtuple( 'Chunk', ['id', 'obj'] )
[docs]
class Chunk( ChunkBase ):
@property
def serialised( self ):
"""Tuple containing the contents of the Chunk."""
klass = self.__class__
return ((klass.__module__, klass.__name__), (('id', self.id), ('obj', self.obj.serialised if self.obj is not None else None)))
[docs]
class ChunkField( StreamField ):
def __init__( self, chunk_map, offset=Chain(), *, count=None, length=None, stream=True,
alignment=1, stream_end=None, stop_check=None, default_klass=None,
id_size=None, id_field=None, id_enum=None, length_field=None,
fill=None, **kwargs ):
"""Field for inserting a tokenised Block stream into the parent class.
chunk_map
A dict mapping between the chunk ID and the Block class to interpret the payload as.
offset
Position of data, relative to the start of the parent block. Defaults to
the end offset of the previous field.
count
Load multiple chunks. None implies a single value, non-negative
numbers will return a Python list.
length
Maximum size of the buffer to read in.
stream
Read elements continuously until a stop condition is met. Defaults to True.
alignment
Number of bytes to align the start of each Chunk to.
stream_end
Byte pattern to denote the end of the stream.
stop_check
A function that takes a data buffer and an offset; should return True if
the end of the data stream has been reached and False otherwise.
default_klass
Fallback Block class to use if there's no match with the chunk_map mapping.
id_size
Size in bytes of the Chunk ID.
id_field
Field class used to parse Chunk ID. Defaults to Bytes.
id_enum
Restrict allowed values for Chunk ID to those provided by a Python enum type. Used for validation.
length_field
Field class used to parse the Chunk data length. For use when a Chunk consists of an ID followed by the size of the data.
fill
Exact byte sequence that denotes an empty Chunk object.
"""
super().__init__( offset=offset, default=None, count=count, length=length,
stream=stream, alignment=alignment, stream_end=stream_end,
stop_check=stop_check, **kwargs )
self.chunk_map = chunk_map
if length_field:
assert issubclass( length_field, NumberField )
self.length_field = length_field( 0x00 )
else:
self.length_field = None
if id_field:
assert issubclass( id_field, (NumberField) )
if id_enum:
self.id_field = id_field( 0x00, enum=id_enum )
else:
self.id_field = id_field( 0x00 )
else:
self.id_field = None
self.default_klass = default_klass
self.id_size = id_size
self.fill = fill
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
chunk_map = property_get( self.chunk_map, parent )
fill = property_get( self.fill, parent )
pointer = offset
chunk_id = None
if self.id_field:
chunk_id = self.id_field.get_from_buffer( buffer[pointer:], parent=parent )
pointer += self.id_field.field_size
elif self.id_size:
chunk_id = buffer[pointer:pointer+self.id_size]
pointer += len( chunk_id )
else:
for test_id in chunk_map:
if buffer[pointer:].startswith( test_id ):
chunk_id = test_id
break
if not chunk_id:
raise ParseError( '{}: Could not find matching chunk at offset {}'.format( self.get_path( parent, index ), pointer ) )
pointer += len( chunk_id )
if chunk_id in chunk_map:
chunk_klass = chunk_map[chunk_id]
elif self.default_klass:
chunk_klass = self.default_klass
else:
raise ParseError( '{}: No chunk class match for ID {}'.format( self.get_path( parent, index ), chunk_id ) )
if self.length_field:
size = self.length_field.get_from_buffer( buffer[pointer:], parent=parent )
pointer += self.length_field.field_size
chunk_buffer = buffer[pointer:pointer+size]
pointer += size
if chunk_buffer == fill:
result = Chunk( id=chunk_id, obj=None )
return result, pointer
chunk = chunk_klass( chunk_buffer, parent=parent, cache_bytes=parent._cache_bytes, path_hint=self.get_path( parent, index ) )
else:
chunk = chunk_klass( buffer[pointer:], parent=parent, cache_bytes=parent._cache_bytes, path_hint=self.get_path( parent, index ) )
pointer += chunk.get_size()
result = Chunk( id=chunk_id, obj=chunk )
return result, pointer
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
chunk_map = property_get( self.chunk_map, parent )
fill = property_get( self.fill, parent )
data = bytearray()
if self.id_field:
data.extend( b'\x00'*self.id_field.field_size )
self.id_field.update_buffer_with_value( element.id, data, parent=parent )
else:
data += element.id
if element.obj is None:
if fill is not None:
payload = fill
else:
raise ValueError( '{}: Object part of Chunk can\'t be None unless there\'s a fill set'.format( self.get_path( parent, index ) ) )
else:
payload = element.obj.export_data()
if self.length_field:
length_buf = bytearray( b'\x00'*self.length_field.field_size )
self.length_field.update_buffer_with_value( len( payload ), length_buf, parent=parent )
data.extend( length_buf )
data += payload
if len( buffer ) < offset+len( data ):
buffer.extend( b'\x00'*(offset+len( data )-len( buffer )) )
buffer[offset:offset+len( data )] = data
return offset+len( data )
[docs]
def validate_element( self, element, parent=None, index=None ):
chunk_map = property_get( self.chunk_map, parent )
fill = property_get( self.fill, parent )
assert isinstance( element, Chunk )
if element.id in chunk_map:
chunk_klass = chunk_map[element.id]
elif self.default_klass:
chunk_klass = self.default_klass
if element.obj is None:
assert fill is not None
else:
assert isinstance( element.obj, chunk_klass )
if self.id_size:
assert len( element.id ) == self.id_size
[docs]
def get_element_size( self, element, parent=None, index=None ):
fill = property_get( self.fill, parent )
size = 0
if self.id_field:
size += self.id_field.field_size
else:
size += len( element.id )
if self.length_field:
size += self.length_field.field_size
if element.obj is None:
size += len( fill )
else:
size += element.obj.get_size()
return size
[docs]
def serialise( self, value, parent=None ):
self.validate( value, parent )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if is_array:
return (('builtins', 'list'), tuple( (a, b.serialised if b is not None else None) for a, b in value ))
return (value[0], value[1].serialised if value is not None else None)
[docs]
class BlockField( StreamField ):
def __init__( self, block_klass, offset=Chain(), *, block_kwargs=None, count=None, fill=None,
block_type=None, default_klass=None, length=None, stream=False,
alignment=1, transform=None, stream_end=None, stop_check=None,
**kwargs ):
"""Field for inserting another Block into the parent class.
block_klass
Block class to use, or a dict mapping between type and block class.
offset
Position of data, relative to the start of the parent block. Defaults to
the end offset of the previous field.
block_kwargs
Arguments to be passed to the constructor of the block class.
count
Load multiple Blocks. None implies a single value, non-negative
numbers will return a Python list.
fill
Exact byte sequence that denotes an empty entry in a list.
block_type
Key to use with the block_klass mapping. (Usually a Ref for a property on the parent block)
default_klass
Fallback Block class to use if there's no match with the block_klass mapping.
length
Maximum size of the buffer to read in.
stream
Read Blocks continuously until a stop condition is met.
alignment
Number of bytes to align the start of each Block to.
transform
Transform class to use for preprocessing the data before importing or
exporting each Block.
stream_end
Byte pattern to denote the end of the stream.
stop_check
A function that takes a data buffer and an offset; should return True if
the end of the data stream has been reached and False otherwise.
"""
super().__init__( offset=offset, default=None, count=count, length=length,
stream=stream, alignment=alignment, stream_end=stream_end,
stop_check=stop_check, **kwargs )
self.block_klass = block_klass
self.block_kwargs = block_kwargs if block_kwargs else {}
self.block_type = block_type
# TODO: support different args if using a switch
self.fill = fill
self.default_klass = default_klass
self.transform = transform
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
fill = property_get( self.fill, parent )
klass = self.get_klass( parent )
# add an empty list entry if we find the fill pattern
if fill and buffer[offset:offset+len( fill )] == fill:
return None, offset+len( fill )
# if we have an inline transform, apply it
elif self.transform:
data = self.transform.import_data( buffer[offset:], parent=parent )
block = klass( source_data=data.payload, parent=parent, cache_bytes=parent._cache_bytes, path_hint=self.get_path( parent, index ), **self.block_kwargs )
return block, offset+data.end_offset
# otherwise, create a block
block = klass( source_data=buffer[offset:], parent=parent, cache_bytes=parent._cache_bytes, path_hint=self.get_path( parent, index ), **self.block_kwargs )
size = block.get_size()
if size == 0:
if stream:
raise ParseError( '{}: Can\'t stream 0 byte Blocks ({}) from a BlockField'.format(self.get_path( parent, index ), klass ) )
elif count and len( result ) == 0:
logger.warning( '{}: copying 0 byte Blocks ({}) from a BlockField, this is probably not what you want'.format( self.get_path( parent, index ), klass ) )
return block, offset+size
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
fill = property_get( self.fill, parent )
klass = self.get_klass( parent )
if element is None:
if fill:
data = fill
else:
raise ParseError( '{}: A fill pattern needs to be specified to use None as a list entry'.format( self.get_path( parent, index ) ) )
else:
data = element.export_data()
if self.transform:
data = self.transform.export_data( data, parent=parent ).payload
if len( buffer ) < offset+len( data ):
buffer.extend( b'\x00'*(offset+len( data )-len( buffer )) )
buffer[offset:offset+len( data )] = data
return offset+len( data )
[docs]
def update_deps( self, value, parent=None ):
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if count is not None and count != len( value ):
property_set( self.count, parent, len( value ) )
if not is_array:
value = [value]
for element in value:
if element is not None:
element.update_deps()
[docs]
def validate_element( self, element, parent=None, index=None ):
klass = self.get_klass( parent )
if (element is not None) and (not isinstance( element, klass )):
raise FieldValidationError( '{}: Expecting block class {}, not {}'.format( self.get_path( parent, index ), klass, type( element ) ) )
[docs]
def get_element_size( self, element, parent=None, index=None ):
fill = property_get( self.fill, parent )
if self.transform:
data = self.transform.export_data( element.export_data(), parent=parent ).payload
return len( data )
elif element is None:
if fill:
return len( fill )
else:
raise ParseError( '{}: A fill pattern needs to be specified to use None as a list entry'.format( self.get_path( parent, index ) ) )
else:
return element.get_size()
[docs]
def get_klass( self, parent=None ):
block_klass = property_get( self.block_klass, parent )
if isinstance( block_klass, dict ):
block_type = property_get( self.block_type, parent )
if block_type in block_klass:
return block_klass[block_type]
elif self.default_klass:
return self.default_klass
else:
raise ParseError( '{}: No block klass match for type {}'.format( self.get_path( parent ), block_type ) )
return block_klass
[docs]
def serialise( self, value, parent=None ):
self.validate( value, parent )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if is_array:
return (('builtins', 'list'), tuple( x.serialised if x is not None else None for x in value ))
return value.serialised if value is not None else None
[docs]
class StringField( StreamField ):
def __init__( self, offset=Chain(), *, default=None, count=None, length=None,
stream=False, alignment=1, stream_end=None, stop_check=None,
transform=None, encoding=False, length_field=None,
fill=None, element_length=None, element_end=None, zero_pad=False,
**kwargs ):
"""Field class for string data.
offset
Position of data, relative to the start of the parent block. Defaults to
the end offset of the previous field.
default
Default value to emit in the case of e.g. creating an empty block.
count
Load multiple strings. None implies a single value, non-negative
numbers will return a Python list.
length
Maximum size of the buffer to read in.
stream
Read strings continuously until a stop condition is met. Defaults to False.
alignment
Number of bytes to align the start of the next element to.
stream_end
Byte string to indicate the end of the data.
stop_check
A function that takes a data buffer and an offset; should return True if
the end of the data stream has been reached and False otherwise.
transform
Transform class to use for preprocessing the data before importing or
exporting each string.
encoding
Python string encoding to use for output, as accepted by bytes.decode().
length_field
Field class used to parse the string length. For use when a string is preceded by
the size.
fill
Exact byte sequence that denotes an empty entry in a list.
element_length
Length of each string element to load.
element_end
Byte string to indicate the end of a single string element.
zero_pad
Pad each element with zeros to match the length. Only for use with fixed
length elements. The data size must be up to or equal to the length.
Defaults to False.
"""
super().__init__( offset=offset, default=default, count=count, length=length,
stream=stream, alignment=alignment, stream_end=stream_end,
stop_check=stop_check, **kwargs )
if count is not None:
assert not stream
assert (element_length is not None) or (length_field is not None) or (element_end is not None)
elif stream:
assert (element_length is not None) or (length_field is not None) or (element_end is not None)
else: # single element
pass
if zero_pad:
assert element_length is not None
if length_field:
assert element_length is None
assert issubclass( length_field, NumberField )
self.length_field = length_field( 0x00 )
else:
self.length_field = None
self.transform = transform
self.zero_pad = zero_pad
self.encoding = encoding
self.fill = fill
self.element_length = element_length
if element_end:
assert common.is_bytes( element_end )
self.element_end = element_end
def _scrub_bytes( self, value, parent=None ):
fill = property_get( self.fill, parent )
encoding = property_get( self.encoding, parent )
data = value
if data is None:
if fill:
return fill
else:
raise ParseError( '{}: A fill pattern needs to be specified to use None as a list entry'.format( self.get_path( parent ) ) )
if encoding:
data = data.encode( encoding )
if self.transform:
data = self.transform.export_data( data, parent=parent ).payload
if self.element_end is not None:
data += self.element_end
return data
[docs]
def get_from_buffer( self, buffer, parent=None ):
encoding = property_get( self.encoding, parent )
try:
result = super().get_from_buffer( buffer, parent=parent )
except EmptyFieldError:
result = b''
if encoding:
result = result.decode( encoding )
return result
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
fill = property_get( self.fill, parent )
encoding = property_get( self.encoding, parent )
element_length = property_get( self.element_length, parent )
element_end = property_get( self.element_end, parent )
zero_pad = property_get( self.zero_pad, parent )
pointer = offset
# add an empty list entry if we find the fill pattern
if fill and buffer[pointer:pointer+len( fill )] == fill:
return None, pointer+len( fill )
if self.length_field:
# if there's a prefixed length field, that determines the end offset
size = self.length_field.get_from_buffer( buffer[pointer:], parent=parent )
pointer += self.length_field.field_size
data = buffer[pointer:pointer+size]
elif element_length:
# if the element length is fixed, that determines the end offset
data = buffer[pointer:pointer+element_length]
else:
# no element size hints, use more guesswork
data = buffer[pointer:]
# if we have an inline transform, apply it
if self.transform:
data_ts = self.transform.import_data( data, parent=parent )
pointer += data_ts.end_offset
data = data_ts.payload
else:
if element_end:
index = data.find( element_end )
if index >= 0:
data = data[:index]
pointer += 1
pointer += len( data )
if zero_pad:
zero_index = data.find( b'\x00' )
if zero_index >= 0:
data = data[:zero_index]
if encoding:
data = data.decode( encoding )
return data, pointer
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
fill = property_get( self.fill, parent )
encoding = property_get( self.encoding, parent )
element_length = property_get( self.element_length, parent )
element_end = property_get( self.element_end, parent )
zero_pad = property_get( self.zero_pad, parent )
data = bytearray()
if element is None:
if fill:
data.extend( fill )
else:
raise ParseError( '{}: A fill pattern needs to be specified to use None as a list entry'.format( self.get_path( parent, index ) ) )
else:
if encoding:
element = element.encode( encoding )
if self.transform:
element = self.transform.export_data( element, parent=parent ).payload
else:
if element_end:
element += element_end
if self.length_field:
length_buf = bytearray( b'\x00'*self.length_field.field_size )
self.length_field.update_buffer_with_value( len( element ), length_buf, parent=parent )
data.extend( length_buf )
data.extend( element )
if element_length is not None:
if element_length != len( element ):
if zero_pad and len( element ) < element_length:
data.extend( b'\x00'*(element_length-len( data )) )
# add element to buffer
if len( buffer ) < offset+len( data ):
buffer.extend( b'\x00'*(offset+len( data )-len( buffer )) )
buffer[offset:offset+len( data )] = data
return offset+len( data )
[docs]
def validate_element( self, element, parent=None, index=None ):
fill = property_get( self.fill, parent )
zero_pad = property_get( self.zero_pad, parent )
encoding = property_get( self.encoding, parent )
element_length = property_get( self.element_length, parent )
if element is None:
assert fill is not None
if encoding:
# try to encode string, throw UnicodeEncodeError if fails
element = element.encode( encoding )
elif not common.is_bytes( element ):
raise FieldValidationError( '{}: Expecting bytes, not {}'.format( self.get_path( parent, index ), type( value ) ) )
if element_length is not None:
if not zero_pad and element_length < len( element ):
raise FieldValidationError( '{}: Elements must have a size of {} but found {}!'.format( self.get_path( parent, index ), element_length, len( element ) ) )
@property
def repr( self ):
details = 'offset={}'.format( hex( self.offset ) if type( self.offset ) == int else self.offset )
if self.length:
details += ', length={}'.format( self.length )
if self.count:
details += ', count={}'.format( self.count )
if self.stream:
details += ', stream={}'.format( self.stream )
if self.default:
details += ', default={}'.format( self.default )
if self.transform:
details += ', transform={}'.format( self.transform )
return details
[docs]
def get_start_offset( self, value, parent=None, index=None ):
assert index is None
offset = property_get( self.offset, parent, caller=self )
return offset
[docs]
def get_element_size( self, element, parent=None, index=None ):
fill = property_get( self.fill, parent )
size = 0
if self.length_field:
size += self.length_field.field_size
size += len( self._scrub_bytes( element, parent=parent ) )
return size
[docs]
def serialise( self, value, parent=None ):
self.validate( value, parent )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if is_array:
return (('builtins', 'list'), tuple( (v for v in value) ))
return value
[docs]
class Bytes( StringField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( offset=offset, **kwargs )
[docs]
class CString( StringField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( offset=offset, element_end=b'\x00', **kwargs )
[docs]
class CStringN( StringField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( offset=offset, element_end=b'\x00', zero_pad=True, **kwargs )
[docs]
class PString( StringField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( offset=offset, length_field=UInt8, **kwargs )
[docs]
class NumberField( StreamField ):
def __init__( self, format_type, field_size, signedness, endian, format_range,
offset=Chain(), *, default=0, count=None, length=None, stream=False,
alignment=1, stream_end=None, stop_check=None, bitmask=None,
range=None, enum=None, **kwargs ):
"""Base class for numeric value Fields.
format_type
Python native type equivalent. Used for validation. (Usually defined by child class)
field_size
Size of field in bytes. (Usually defined by child class)
signedness
Signedness of the field. Should be 'signed' or 'unsigned'. (Usually defined by child class)
endian
Endianness of the field. Should be 'little', 'big' or None. (Usually defined by child class)
format_range
Numeric bounds of format. Used for validation. (Usually defined by child class)
offset
Position of data, relative to the start of the parent block. Defaults to
the end offset of the previous field.
default
Default value to emit in the case of e.g. creating an empty Block.
count
Load multiple numbers. None implies a single value, non-negative
numbers will return a Python list.
length
Maximum size of the buffer to read in.
stream
Read elements continuously until a stop condition is met.
alignment
Number of bytes to align the start of each element to.
stream_end
Byte pattern to denote the end of the stream.
stop_check
A function that takes a data buffer and an offset; should return True if
the end of the data stream has been reached and False otherwise.
bitmask
Apply AND mask (bytes) to data before reading/writing. Used for demultiplexing
data to multiple fields, e.g. one byte with 8 flag fields.
range
Restrict allowed values to a list of choices. Used for validation
enum
Restrict allowed values to those provided by a Python enum type. Used for validation.
"""
super().__init__( offset=offset, default=default, count=count, length=length,
stream=stream, alignment=alignment, stream_end=stream_end,
stop_check=stop_check, **kwargs )
self.format_type = format_type
self.field_size = field_size
self.signedness = signedness
self.endian = endian
self.format_range = format_range
if bitmask:
assert common.is_bytes( bitmask )
assert len( bitmask ) == field_size
self.bitmask = bitmask
self.range = range
self.enum = enum
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
format_type = property_get( self.format_type, parent )
field_size = property_get( self.field_size, parent )
signedness = property_get( self.signedness, parent )
endian = property_get( self.endian, parent )
data = buffer[offset:offset+self.field_size]
assert len( data ) == self.field_size
if self.bitmask:
# if a bitmask is defined, AND with it first
data = (int.from_bytes( data, byteorder='big' ) &
int.from_bytes( self.bitmask, byteorder='big' )
).to_bytes( self.field_size, byteorder='big' )
# convert bytes to Python type
element = encoding.unpack( (format_type, field_size, signedness, endian), data )
# friendly warnings if the imported data fails the range check
if self.range and (element not in self.range):
logger.warning( '{}: value {} outside of range {}'.format( self.get_path( parent, index ), element, self.range ) )
# friendly warning if the imported data fails the enum check
if self.enum:
if (element not in [x.value for x in self.enum]):
logger.warning( '{}: value {} not castable to {}'.format( self.get_path( parent, index ), element, self.enum ) )
else:
# cast to enum because why not
element = self.enum( element )
return element, offset+self.field_size
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
field_size = property_get( self.field_size, parent )
format_type = property_get( self.format_type, parent )
field_size = property_get( self.field_size, parent )
signedness = property_get( self.signedness, parent )
endian = property_get( self.endian, parent )
data = encoding.pack( (format_type, field_size, signedness, endian), element )
# force check for no data loss in the value from bitmask
if self.bitmask:
assert (int.from_bytes( data, byteorder='big' ) &
int.from_bytes( self.bitmask, byteorder='big' ) ==
int.from_bytes( data, byteorder='big' ))
for i in range( field_size ):
# set bitmasked areas of target to 0
buffer[offset+i] &= (self.bitmask[i] ^ 0xff)
# OR target with replacement bitmasked portion
buffer[offset+i] |= (data[i] & self.bitmask[i])
else:
for i in range( field_size ):
buffer[offset+i] = data[i]
return offset+field_size
[docs]
def update_deps( self, value, parent=None ):
count = property_get( self.count, parent )
if count is not None and count != len( value ):
property_set( self.count, parent, len( value ) )
[docs]
def validate_element( self, element, parent=None, index=None ):
if self.enum:
if (element not in [x.value for x in self.enum]):
raise FieldValidationError( '{}: Value {} not castable to {}'.format( self.get_path( parent, index ), element, self.enum ) )
element = self.enum( element ).value
if (type( element ) != self.format_type):
raise FieldValidationError( '{}: Expecting type {}, not {}'.format( self.get_path( parent, index ), self.format_type, type( element ) ) )
if self.format_range is not None and (element not in self.format_range):
raise FieldValidationError( '{}: Value {} not in format range ({})'.format( self.get_path( parent, index ), element, self.format_range ) )
if self.range is not None and (element not in self.range):
raise FieldValidationError( '{}: Value {} not in range ({})'.format( self.get_path( parent, index ), element, self.range ) )
return
[docs]
def get_element_size( self, element, parent=None, index=None ):
field_size = property_get( self.field_size, parent )
return field_size
@property
def repr( self ):
details = 'offset={}'.format( hex( self.offset ) if type( self.offset ) == int else self.offset )
if self.default:
details += ', default={}'.format( self.default )
if self.range:
details += ', range={}'.format( self.range )
if self.bitmask:
details += ', bitmask={}'.format( self.bitmask )
return details
@property
def serialised( self ):
return common.serialise( self, ('offset', 'default', 'count', 'length', 'stream', 'alignment', 'stream_end', 'stop_check', 'format_type', 'field_size', 'signedness', 'endian', 'format_range', 'bitmask', 'range', 'enum') )
[docs]
def serialise( self, value, parent=None ):
self.validate( value, parent )
count = property_get( self.count, parent )
stream = property_get( self.stream, parent )
is_array = stream or (count is not None)
if is_array:
return (('builtins', 'list'), tuple( value ))
return value
[docs]
class Int8( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 1, 'signed', None, range( -1<<7, 1<<7 ), offset=offset, **kwargs )
[docs]
class UInt8( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 1, 'unsigned', None, range( 0, 1<<8 ), offset=offset, **kwargs )
[docs]
class Bits( NumberField ):
def __init__( self, offset=Chain(), bits=0, *, default=0, size=1, enum=None, endian=None, **kwargs ):
SIZES = {
1: (int, 1, 'unsigned', None if endian is None else endian, range( 0, 1<<8 )),
2: (int, 2, 'unsigned', 'big' if endian is None else endian, range( 0, 1<<16 )),
4: (int, 4, 'unsigned', 'big' if endian is None else endian, range( 0, 1<<32 )),
8: (int, 8, 'unsigned', 'big' if endian is None else endian, range( 0, 1<<64 )),
}
assert size in SIZES
assert type( bits ) == int
assert (bits >= 0)
assert (bits < 1<<(8*size))
self.mask_bits = bin( bits ).split( 'b', 1 )[1]
self.bits = [(1<<i) for i, x in enumerate( reversed( self.mask_bits ) ) if x == '1']
self.check_range = range( 0, 1<<len( self.bits ) )
# because we reinterpret the value of the element, we need a seperate enum evaluation
# compared to the base class
self.enum_t = enum
bitmask = encoding.pack( SIZES[size][:4], bits )
super().__init__( *SIZES[size], offset=offset, default=default, bitmask=bitmask, **kwargs )
[docs]
def get_element_from_buffer( self, offset, buffer, parent=None, index=None ):
result, end_offset = super().get_element_from_buffer( offset, buffer, parent, index=index )
element = 0
for i, x in enumerate( self.bits ):
element += (1 << i) if (result & x) else 0
if self.enum_t:
if (element not in [x.value for x in self.enum_t]):
logger.warning( '{}: Value {} not castable to {}'.format( self.get_path( parent, index ), element, self.enum_t ) )
else:
# cast to enum because why not
element = self.enum_t( element )
return element, end_offset
[docs]
def update_buffer_with_element( self, offset, element, buffer, parent=None, index=None ):
assert element in self.check_range
if self.enum_t:
element = self.enum_t( element ).value
packed = 0
for i, x in enumerate( self.bits ):
if (element & (1 << i)):
packed |= x
return super().update_buffer_with_element( offset, packed, buffer, parent, index=index )
[docs]
def validate_element( self, value, parent=None, index=None ):
if self.enum_t:
if (value not in [x.value for x in self.enum_t]):
raise FieldValidationError( '{}: Value {} not castable to {}'.format( self.get_path( parent, index ), value, self.enum_t ) )
value = self.enum_t( value ).value
super().validate_element( value, parent, index=index )
@property
def repr( self ):
details = 'offset={}, bits=0b{}'.format( hex( self.offset ) if type( self.offset ) == int else self.offset, self.mask_bits )
if self.default:
details += ', default={}'.format( self.default )
return details
@property
def serialised( self ):
return common.serialise( self, ('offset', 'default', 'count', 'length', 'stream', 'alignment', 'stream_end', 'stop_check', 'format_type', 'field_size', 'signedness', 'endian', 'format_range', 'bitmask', 'range', 'enum', 'bits', 'enum_t') )
[docs]
class Bits8( Bits ):
def __init__( self, offset=Chain(), bits=0, **kwargs ):
super().__init__( offset=offset, bits=bits, size=1, **kwargs )
[docs]
class Bits16( Bits ):
def __init__( self, offset=Chain(), bits=0, **kwargs ):
super().__init__( offset=offset, bits=bits, size=2, **kwargs )
[docs]
class Bits32( Bits ):
def __init__( self, offset=Chain(), bits=0, **kwargs ):
super().__init__( offset=offset, bits=bits, size=4, **kwargs )
[docs]
class Bits64( Bits ):
def __init__( self, offset=Chain(), bits=0, **kwargs ):
super().__init__( offset=offset, bits=bits, size=8, **kwargs )
[docs]
class Int16_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'signed', 'little', range( -1<<15, 1<<15 ), offset=offset, **kwargs )
[docs]
class Int24_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'signed', 'little', range( -1<<23, 1<<23 ), offset=offset, **kwargs )
[docs]
class Int32_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'signed', 'little', range( -1<<31, 1<<31 ), offset=offset, **kwargs )
[docs]
class Int64_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'signed', 'little', range( -1<<63, 1<<63 ), offset=offset, **kwargs )
[docs]
class UInt16_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'unsigned', 'little', range( 0, 1<<16 ), offset=offset, **kwargs )
[docs]
class UInt24_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'unsigned', 'little', range( 0, 1<<24 ), offset=offset, **kwargs )
[docs]
class UInt32_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'unsigned', 'little', range( 0, 1<<32 ), offset=offset, **kwargs )
[docs]
class UInt64_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'unsigned', 'little', range( 0, 1<<64 ), offset=offset, **kwargs )
[docs]
class Float32_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 4, 'signed', 'little', None, offset=offset, **kwargs )
[docs]
class Float64_LE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 8, 'signed', 'little', None, offset=offset, **kwargs )
[docs]
class Int16_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'signed', 'big', range( -1<<15, 1<<15 ), offset=offset, **kwargs )
[docs]
class Int24_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'signed', 'big', range( -1<<23, 1<<23 ), offset=offset, **kwargs )
[docs]
class Int32_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'signed', 'big', range( -1<<31, 1<<31 ), offset=offset, **kwargs )
[docs]
class Int64_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'signed', 'big', range( -1<<63, 1<<63 ), offset=offset, **kwargs )
[docs]
class UInt16_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'unsigned', 'big', range( 0, 1<<16 ), offset=offset, **kwargs )
[docs]
class UInt24_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'unsigned', 'big', range( 0, 1<<24 ), offset=offset, **kwargs )
[docs]
class UInt32_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'unsigned', 'big', range( 0, 1<<32 ), offset=offset, **kwargs )
[docs]
class UInt64_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'unsigned', 'big', range( 0, 1<<64 ), offset=offset, **kwargs )
[docs]
class Float32_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 4, 'signed', 'big', None, offset=offset, **kwargs )
[docs]
class Float64_BE( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 8, 'signed', 'big', None, offset=offset, **kwargs )
[docs]
class Int16_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'signed', Ref( '_endian' ), range( -1<<15, 1<<15 ), offset=offset, **kwargs )
[docs]
class Int24_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'signed', Ref( '_endian' ), range( -1<<23, 1<<23 ), offset=offset, **kwargs )
[docs]
class Int32_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'signed', Ref( '_endian' ), range( -1<<31, 1<<31 ), offset=offset, **kwargs )
[docs]
class Int64_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'signed', Ref( '_endian' ), range( -1<<63, 1<<63 ), offset=offset, **kwargs )
[docs]
class UInt16_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 2, 'unsigned', Ref( '_endian' ), range( 0, 1<<16 ), offset=offset, **kwargs )
[docs]
class UInt24_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 3, 'unsigned', Ref( '_endian' ), range( 0, 1<<24 ), offset=offset, **kwargs )
[docs]
class UInt32_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 4, 'unsigned', Ref( '_endian' ), range( 0, 1<<32 ), offset=offset, **kwargs )
[docs]
class UInt64_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( int, 8, 'unsigned', Ref( '_endian' ), range( 0, 1<<64 ), offset=offset, **kwargs )
[docs]
class Float32_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 4, 'signed', Ref( '_endian' ), None, offset=offset, **kwargs )
[docs]
class Float64_P( NumberField ):
def __init__( self, offset=Chain(), **kwargs ):
super().__init__( float, 8, 'signed', Ref( '_endian' ), None, offset=offset, **kwargs )