图片完整性验证

通过数据流下载的图片,往往会由于网络原因等,造成下载图片不完整。

下例代码,实现了jpg,png格式图片的完整性验证。通过继承BaseInspector,可方便地扩展至其他类型图片验证。

# -*- coding: utf-8 -*-
# @Author  : LG

"""
    图片完整性检查
    可通过继承BaseInspector扩展其他类型图片及文件的检查
"""

import os
from typing import Tuple


class BaseInspector:
    def __init__(self, suffix:str, start_end_char:Tuple[str, str]):
        self.suffix = suffix    # 文件后缀
        self.start_end_char = start_end_char    # 起始符与结束符

    def is_valid(self, path):
        raise NotImplementedError


class JpgInspector(BaseInspector):
    def __init__(self, suffix = 'jpg', start_end_char = (b'\xff\xd8', b'\xff\xd9')):
        super(JpgInspector, self).__init__(suffix, start_end_char)
        self.end_char_len = len(self.start_end_char[1])

    def is_valid(self, path):
        with open(path, 'rb') as f:
            f.seek(-self.end_char_len, 2)  # 将文件读取指针指定到倒数第二个位置
            return f.read() == self.start_end_char[1]  # jpg文件结束符是\xff\xd9


class PngInspector(BaseInspector):
    def __init__(self, suffix = 'png',
                 start_end_char = (b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A',
                                   b'\x00\x00\x00\x00\x49\x45\x4E\x44\xAE\x42\x60\x82')):
        super(PngInspector, self).__init__(suffix, start_end_char)
        self.end_char_len = len(self.start_end_char[1])

    def is_valid(self, path):
        with open(path, 'rb') as f:
            f.seek(-self.end_char_len, 2)
            return f.read() == self.start_end_char[1]  # png文件结束符


class Inspector(object):
    def __init__(self):
        self.fns = {}   # 检测方法,通过register注册到检测器
        self.invalid_num = 0    # 记录无效图片数量
        self.root = ''  # 绝对路径目录

    def register(self, fn:BaseInspector):
        self.fns[fn.suffix] = fn

    def one_img_inspect(self, path:str):
        suffix = os.path.splitext(path)[1].lower().lstrip('.')
        if suffix in self.fns:
            if not self.fns[suffix].is_valid(path):
                self.invalid_num += 1
                print('{}'.format(os.path.relpath(path, self.root)))
        else:
            pass

    def root_inspect(self, root:str):
        fs = os.listdir(root)
        for f in fs:
            f = os.path.join(root, f)
            if os.path.isfile(f):
                self.one_img_inspect(f)
            if os.path.isdir(f):
                self.root_inspect(f)

    def run(self, path):

        if os.path.isfile(path):
            self.root = os.path.split(path)[0]
            self.one_img_inspect(path)

        if os.path.isdir(path):
            self.root = path
            self.root_inspect(path)

        print('Found invaild file {}.'.format(self.invalid_num))


if __name__ == '__main__':

    import argparse

    parse = argparse.ArgumentParser(description='图片完整性检查')
    parse.add_argument('--path', type=str, required=True, help='图片路径或根目录')
    parse.add_argument('--types', type=str, default=['jpg', 'png'], nargs='+', help='文件后缀(小写)')
    args = parse.parse_args()

    #
    inspector_types = {'jpg': JpgInspector(),
                       'png': PngInspector()}

    print("Now support format: {}".format(' | '.join(inspector_types.keys())))

    inspector = Inspector() # 检查器
    for t in args.types:
        if t in inspector_types:
            inspector.register(inspector_types[t])
            print('[{}] inspector have registed'.format(t))

        else:
            print('Not support {} file.'.format(t))

    inspector.run(args.path)