Source code for detools.info

import os
from .errors import Error
from .apply import read_header_sequential
from .apply import read_header_in_place
from .apply import read_header_hdiffpatch
from .apply import PatchReader
from .common import PATCH_TYPE_SEQUENTIAL
from .common import PATCH_TYPE_IN_PLACE
from .common import PATCH_TYPE_HDIFFPATCH
from .common import file_size
from .common import unpack_size
from .common import unpack_size_with_length
from .common import data_format_number_to_string
from .common import peek_header_type
from .compression.heatshrink import HeatshrinkDecompressor
from .data_format import info as data_format_info


def _compression_info(patch_reader):
    info = None

    if patch_reader:
        decompressor = patch_reader.decompressor

        if isinstance(decompressor, HeatshrinkDecompressor):
            info = {
                'window-sz2': decompressor.window_sz2,
                'lookahead-sz2': decompressor.lookahead_sz2
            }

    return info


def patch_info_sequential_inner(patch_reader, to_size):
    to_pos = 0
    number_of_size_bytes = 0
    diff_sizes = []
    extra_sizes = []
    adjustment_sizes = []

    while to_pos < to_size:
        # Diff data.
        size, number_of_bytes = unpack_size_with_length(patch_reader)

        if to_pos + size > to_size:
            raise Error("Patch diff data too long.")

        diff_sizes.append(size)
        number_of_size_bytes += number_of_bytes
        patch_reader.decompress(size)
        to_pos += size

        # Extra data.
        size, number_of_bytes = unpack_size_with_length(patch_reader)
        number_of_size_bytes += number_of_bytes

        if to_pos + size > to_size:
            raise Error("Patch extra data too long.")

        extra_sizes.append(size)
        patch_reader.decompress(size)
        to_pos += size

        # Adjustment.
        size, number_of_bytes = unpack_size_with_length(patch_reader)
        number_of_size_bytes += number_of_bytes
        adjustment_sizes.append(size)

    return (to_size,
            diff_sizes,
            extra_sizes,
            adjustment_sizes,
            number_of_size_bytes)


def patch_info_sequential(fpatch, fsize):
    patch_size = file_size(fpatch)
    compression, to_size = read_header_sequential(fpatch)
    dfpatch_size = 0
    data_format = None
    dfpatch_info = None
    patch_reader = None

    if to_size == 0:
        info = (0, [], [], [], 0)
    else:
        patch_reader = PatchReader(fpatch, compression)
        dfpatch_size = unpack_size(patch_reader)

        if dfpatch_size > 0:
            data_format = unpack_size(patch_reader)
            patch = patch_reader.decompress(dfpatch_size)
            dfpatch_info = data_format_info(data_format, patch, fsize)
            data_format = data_format_number_to_string(data_format)

        info = patch_info_sequential_inner(patch_reader, to_size)

        if not patch_reader.eof:
            raise Error('End of patch not found.')

    return (patch_size,
            compression,
            _compression_info(patch_reader),
            dfpatch_size,
            data_format,
            dfpatch_info,
            *info)


def patch_info_in_place(fpatch):
    patch_size = file_size(fpatch)
    (compression,
     memory_size,
     segment_size,
     shift_size,
     from_size,
     to_size) = read_header_in_place(fpatch)
    segments = []
    patch_reader = None

    if to_size > 0:
        patch_reader = PatchReader(fpatch, compression)

        for to_pos in range(0, to_size, segment_size):
            segment_to_size = min(segment_size, to_size - to_pos)
            dfpatch_size = unpack_size(patch_reader)

            if dfpatch_size > 0:
                data_format = unpack_size(patch_reader)
                data_format = data_format_number_to_string(data_format)
                patch_reader.decompress(dfpatch_size)
            else:
                data_format = None

            info = patch_info_sequential_inner(patch_reader, segment_to_size)
            segments.append((dfpatch_size, data_format, info))

    return (patch_size,
            compression,
            _compression_info(patch_reader),
            memory_size,
            segment_size,
            shift_size,
            from_size,
            to_size,
            segments)


def patch_info_hdiffpatch(fpatch):
    patch_size = file_size(fpatch)
    compression, to_size, _ = read_header_hdiffpatch(fpatch)
    patch_reader = None

    if to_size > 0:
        patch_reader = PatchReader(fpatch, compression)

    return (patch_size,
            compression,
            _compression_info(patch_reader),
            to_size)


[docs]def patch_info(fpatch, fsize=None): """Get patch information from given file-like patch object `fpatch`. """ if fsize is None: fsize = str patch_type = peek_header_type(fpatch) if patch_type == PATCH_TYPE_SEQUENTIAL: return 'sequential', patch_info_sequential(fpatch, fsize) elif patch_type == PATCH_TYPE_IN_PLACE: return 'in-place', patch_info_in_place(fpatch) elif patch_type == PATCH_TYPE_HDIFFPATCH: return 'hdiffpatch', patch_info_hdiffpatch(fpatch) else: raise Error('Bad patch type {}.'.format(patch_type))
[docs]def patch_info_filename(patchfile, fsize=None): """Same as :func:`~detools.patch_info()`, but with a filename instead of a file-like object. """ with open(patchfile, 'rb') as fpatch: return patch_info(fpatch, fsize)