#!/usr/bin/env python3 # Note: # - Hardlinks are copied # - The size of symlinks and directories is meaningless, it depends on whatever # the filesystem/tar file reports import argparse import json import os import stat import sys import itertools import logging import hashlib import tarfile VERSION = 3 IDX_NAME = 0 IDX_SIZE = 1 IDX_MTIME = 2 IDX_MODE = 3 IDX_UID = 4 IDX_GID = 5 # target for symbolic links # child nodes for directories # filename for files IDX_TARGET = 6 IDX_FILENAME = 6 HASH_LENGTH = 8 S_IFLNK = 0xA000 S_IFREG = 0x8000 S_IFDIR = 0x4000 def hash_file(filename) -> str: with open(filename, "rb", buffering=0) as f: return hash_fileobj(f) def hash_fileobj(f) -> str: h = hashlib.sha256() for b in iter(lambda: f.read(128*1024), b""): h.update(b) return h.hexdigest() def main(): logging.basicConfig(format="%(message)s") logger = logging.getLogger("fs2json") logger.setLevel(logging.DEBUG) args = argparse.ArgumentParser(description="Create filesystem JSON. Example:\n" " ./fs2xml.py --exclude /boot/ --out fs.json /mnt/", formatter_class=argparse.RawTextHelpFormatter ) args.add_argument("--exclude", action="append", metavar="path", help="Path to exclude (relative to base path). Can be specified multiple times.") args.add_argument("--out", metavar="out", nargs="?", type=argparse.FileType("w"), help="File to write to (defaults to stdout)", default=sys.stdout) args.add_argument("path", metavar="path-or-tar", help="Base path or tar file to include in JSON") args = args.parse_args() path = os.path.normpath(args.path) try: tar = tarfile.open(path, "r") except IsADirectoryError: tar = None if tar: (root, total_size) = handle_tar(logger, tar) else: (root, total_size) = handle_dir(logger, path, args.exclude) if False: # normalize the order of children, useful to debug differences between # the tar and filesystem reader def sort_children(children): for c in children: if isinstance(c[IDX_TARGET], list): sort_children(c[IDX_TARGET]) children.sort() sort_children(root) result = { "fsroot": root, "version": VERSION, "size": total_size, } logger.info("Creating json ...") json.dump(result, args.out, check_circular=False, separators=(',', ':')) def handle_dir(logger, path, exclude): path = path + "/" exclude = exclude or [] exclude = [os.path.join("/", os.path.normpath(p)) for p in exclude] exclude = set(exclude) def onerror(oserror): logger.warning(oserror) rootdepth = path.count("/") files = os.walk(path, onerror=onerror) prevpath = [] mainroot = [] filename_to_hash = {} total_size = 0 rootstack = [mainroot] def make_node(st, name): obj = [None] * 7 obj[IDX_NAME] = name obj[IDX_SIZE] = st.st_size obj[IDX_MTIME] = int(st.st_mtime) obj[IDX_MODE] = int(st.st_mode) obj[IDX_UID] = st.st_uid obj[IDX_GID] = st.st_gid nonlocal total_size total_size += st.st_size # Missing: # int(st.st_atime), # int(st.st_ctime), return obj logger.info("Creating file tree ...") for f in files: dirpath, dirnames, filenames = f pathparts = dirpath.split("/") pathparts = pathparts[rootdepth:] fullpath = os.path.join("/", *pathparts) if fullpath in exclude: dirnames[:] = [] continue depth = 0 for this, prev in zip(pathparts, prevpath): if this != prev: break depth += 1 for _name in prevpath[depth:]: rootstack.pop() oldroot = rootstack[-1] assert len(pathparts[depth:]) == 1 openname = pathparts[-1] if openname == "": root = mainroot else: root = [] st = os.stat(dirpath) rootobj = make_node(st, openname) rootobj[IDX_TARGET] = root oldroot.append(rootobj) rootstack.append(root) for filename in itertools.chain(filenames, dirnames): absname = os.path.join(dirpath, filename) st = os.lstat(absname) isdir = stat.S_ISDIR(st.st_mode) islink = stat.S_ISLNK(st.st_mode) isfile = stat.S_ISREG(st.st_mode) if isdir and not islink: continue obj = make_node(st, filename) if islink: target = os.readlink(absname) obj[IDX_TARGET] = target elif isfile: file_hash = hash_file(absname) filename = file_hash[0:HASH_LENGTH] + ".bin" existing = filename_to_hash.get(filename) assert existing is None or existing == file_hash, "Collision in short hash (%s and %s)" % (existing, file_hash) filename_to_hash[filename] = file_hash obj[IDX_FILENAME] = filename while obj[-1] is None: obj.pop() root.append(obj) prevpath = pathparts return (mainroot, total_size) def handle_tar(logger, tar): mainroot = [] filename_to_hash = {} total_size = 0 for member in tar.getmembers(): parts = member.name.split("/") name = parts.pop() dir = mainroot for p in parts: for c in dir: if c[IDX_NAME] == p: dir = c[IDX_TARGET] obj = [None] * 7 obj[IDX_NAME] = name obj[IDX_SIZE] = member.size obj[IDX_MTIME] = member.mtime obj[IDX_MODE] = member.mode obj[IDX_UID] = member.uid obj[IDX_GID] = member.gid if member.isfile() or member.islnk(): obj[IDX_MODE] |= S_IFREG f = tar.extractfile(member) file_hash = hash_fileobj(f) filename = file_hash[0:HASH_LENGTH] + ".bin" existing = filename_to_hash.get(filename) assert existing is None or existing == file_hash, "Collision in short hash (%s and %s)" % (existing, file_hash) filename_to_hash[filename] = file_hash obj[IDX_FILENAME] = filename if member.islnk(): # fix size for hard links f.seek(0, os.SEEK_END) obj[IDX_SIZE] = int(f.tell()) elif member.isdir(): obj[IDX_MODE] |= S_IFDIR obj[IDX_TARGET] = [] elif member.issym(): obj[IDX_MODE] |= S_IFLNK obj[IDX_TARGET] = member.linkname else: logger.error("Unsupported type: {} ({})".format(member.type, name)) total_size += obj[IDX_SIZE] while obj[-1] is None: obj.pop() dir.append(obj) return mainroot, total_size if __name__ == "__main__": main()