#!/usr/bin/env python3 """Sync files between a computer and an Android device""" __version__ = "1.4.0" from typing import List, Tuple, Union import logging import os import stat import fnmatch from .argparsing import get_cli_args from .SAOLogging import logging_fatal, log_tree, setup_root_logger, perror, FATAL from .FileSystems.Base import FileSystem from .FileSystems.Local import LocalFileSystem from .FileSystems.Android import AndroidFileSystem class FileSyncer(): @classmethod def diff_trees(cls, source: Union[dict, Tuple[int, int], None], destination: Union[dict, Tuple[int, int], None], path_source: str, path_destination: str, destination_exclude_patterns: List[str], path_join_function_source, path_join_function_destination, folder_file_overwrite_error: bool = True, ) -> Tuple[ Union[dict, Tuple[int, int], None], # delete Union[dict, Tuple[int, int], None], # copy Union[dict, Tuple[int, int], None], # excluded_source Union[dict, Tuple[int, int], None], # unaccounted_destination Union[dict, Tuple[int, int], None] # excluded_destination ]: exclude = False for destination_exclude_pattern in destination_exclude_patterns: if fnmatch.fnmatch(path_destination, destination_exclude_pattern): exclude = True break if source is None: if destination is None: delete = None copy = None excluded_source = None unaccounted_destination = None excluded_destination = None elif isinstance(destination, tuple): if exclude: delete = None copy = None excluded_source = None unaccounted_destination = None excluded_destination = destination else: delete = None copy = None excluded_source = None unaccounted_destination = destination excluded_destination = None elif isinstance(destination, dict): if exclude: delete = {".": None} copy = None excluded_source = None unaccounted_destination = {".": None} excluded_destination = destination else: delete = {".": None} copy = None excluded_source = None unaccounted_destination = {".": destination["."]} excluded_destination = {".": None} destination.pop(".") for key, value in destination.items(): delete[key], _, _, unaccounted_destination[key], excluded_destination[key] = cls.diff_trees( None, value, path_join_function_source(path_source, key), path_join_function_destination(path_destination, key), destination_exclude_patterns, path_join_function_source, path_join_function_destination, folder_file_overwrite_error = folder_file_overwrite_error ) else: raise NotImplementedError elif isinstance(source, tuple): if destination is None: if exclude: delete = None copy = None excluded_source = source unaccounted_destination = None excluded_destination = None else: delete = None copy = source excluded_source = None unaccounted_destination = None excluded_destination = None elif isinstance(destination, tuple): if exclude: delete = None copy = None excluded_source = source unaccounted_destination = None excluded_destination = destination else: if source[1] > destination[1]: delete = destination copy = source excluded_source = None unaccounted_destination = None excluded_destination = None else: delete = None copy = None excluded_source = None unaccounted_destination = None excluded_destination = None elif isinstance(destination, dict): if exclude: delete = {".": None} copy = None excluded_source = source unaccounted_destination = {".": None} excluded_destination = destination else: delete = destination copy = source excluded_source = None unaccounted_destination = {".": None} excluded_destination = {".": None} if folder_file_overwrite_error: logging.critical(f"Refusing to overwrite directory {path_destination} with file {path_source}") logging_fatal("Use --force if you are sure!") else: logging.warning(f"Overwriting directory {path_destination} with file {path_source}") else: raise NotImplementedError elif isinstance(source, dict): if destination is None: if exclude: delete = None copy = {".": None} excluded_source = source unaccounted_destination = None excluded_destination = None else: delete = None copy = {".": source["."]} excluded_source = {".": None} unaccounted_destination = None excluded_destination = None source.pop(".") for key, value in source.items(): _, copy[key], excluded_source[key], _, _ = cls.diff_trees( value, None, path_join_function_source(path_source, key), path_join_function_destination(path_destination, key), destination_exclude_patterns, path_join_function_source, path_join_function_destination, folder_file_overwrite_error = folder_file_overwrite_error ) elif isinstance(destination, tuple): if exclude: delete = None copy = {".": None} excluded_source = source unaccounted_destination = None excluded_destination = destination else: delete = destination copy = {".": source["."]} excluded_source = {".": None} unaccounted_destination = None excluded_destination = None source.pop(".") for key, value in source.items(): _, copy[key], excluded_source[key], _, _ = cls.diff_trees( value, None, path_join_function_source(path_source, key), path_join_function_destination(path_destination, key), destination_exclude_patterns, path_join_function_source, path_join_function_destination, folder_file_overwrite_error = folder_file_overwrite_error ) if folder_file_overwrite_error: logging.critical(f"Refusing to overwrite file {path_destination} with directory {path_source}") logging_fatal("Use --force if you are sure!") else: logging.warning(f"Overwriting file {path_destination} with directory {path_source}") excluded_destination = None elif isinstance(destination, dict): if exclude: delete = {".": None} copy = {".": None} excluded_source = source unaccounted_destination = {".": None} excluded_destination = destination else: delete = {".": None} copy = {".": None} excluded_source = {".": None} unaccounted_destination = {".": None} excluded_destination = {".": None} source.pop(".") for key, value in source.items(): delete[key], copy[key], excluded_source[key], unaccounted_destination[key], excluded_destination[key] = cls.diff_trees( value, destination.pop(key, None), path_join_function_source(path_source, key), path_join_function_destination(path_destination, key), destination_exclude_patterns, path_join_function_source, path_join_function_destination, folder_file_overwrite_error = folder_file_overwrite_error ) destination.pop(".") for key, value in destination.items(): delete[key], _, _, unaccounted_destination[key], excluded_destination[key] = cls.diff_trees( None, value, path_join_function_source(path_source, key), path_join_function_destination(path_destination, key), destination_exclude_patterns, path_join_function_source, path_join_function_destination, folder_file_overwrite_error = folder_file_overwrite_error ) else: raise NotImplementedError else: raise NotImplementedError return delete, copy, excluded_source, unaccounted_destination, excluded_destination @classmethod def remove_excluded_folders_from_unaccounted_tree(cls, unaccounted: Union[dict, Tuple[int, int]], excluded: Union[dict, None]) -> dict: # For when we have --del but not --delete-excluded selected; we do not want to delete unaccounted folders that are the # parent of excluded items. At the point in the program that this function is called at either # 1) unaccounted is a tuple (file) and excluded is None # 2) unaccounted is a dict and excluded is a dict or None # trees passed to this function are already pruned; empty dictionary (sub)trees don't exist if excluded is None: return unaccounted else: unaccounted_non_excluded = {} for unaccounted_key, unaccounted_value in unaccounted.items(): if unaccounted_key == ".": continue unaccounted_non_excluded[unaccounted_key] = cls.remove_excluded_folders_from_unaccounted_tree( unaccounted_value, excluded.get(unaccounted_key, None) ) return unaccounted_non_excluded @classmethod def prune_tree(cls, tree): """Remove all Nones from a tree. May return None if tree is None however.""" if not isinstance(tree, dict): return tree else: return_dict = {} for key, value in tree.items(): value_pruned = cls.prune_tree(value) if value_pruned is not None: return_dict[key] = value_pruned return return_dict or None @classmethod def sort_tree(cls, tree): if not isinstance(tree, dict): return tree return { k: cls.sort_tree(v) for k, v in sorted(tree.items()) } @classmethod def paths_to_fixed_destination_paths(cls, path_source: str, fs_source: FileSystem, path_destination: str, fs_destination: FileSystem ) -> Tuple[str, str]: """Modify sync paths according to how a trailing slash on the source path should be treated""" # TODO I'm not exactly sure if this covers source and destination being symlinks (lstat vs stat etc) # we only need to consider when the destination is a directory try: lstat_destination = fs_destination.lstat(path_destination) except FileNotFoundError: return path_source, path_destination except (NotADirectoryError, PermissionError) as e: perror(path_source, e, FATAL) if stat.S_ISLNK(lstat_destination.st_mode): logging_fatal("Destination is a symlink. Not sure what to do. See GitHub issue #8") if not stat.S_ISDIR(lstat_destination.st_mode): return path_source, path_destination # we know the destination is a directory at this point try: lstat_source = fs_source.lstat(path_source) except FileNotFoundError: return path_source, path_destination except (NotADirectoryError, PermissionError) as e: perror(path_source, e, FATAL) if stat.S_ISREG(lstat_source.st_mode) or (stat.S_ISDIR(lstat_source.st_mode) and path_source[-1] not in ["/", "\\"]): path_destination = fs_destination.join( path_destination, fs_destination.split(path_source)[1] ) return path_source, path_destination def sync_with_options(**kargs): class Args: def __init__(self, **kwargs): self.direction = kwargs['direction'] self.source = kwargs['source'] self.destination = kwargs['dest'] self.dry_run = kwargs['dry_run'] if 'dry_run' in kwargs else False self.copy_links = kwargs['copy_links'] if 'copy_links' in kwargs else False self.exclude = kwargs['exclude'] if 'exclude' in kwargs else [] self.delete = kwargs['delete'] if 'delete' in kwargs else False self.delete_excluded = kwargs['delete_excluded'] if 'delete_excluded' in kwargs else False self.force = kwargs['force'] if 'force' in kwargs else False self.show_progress = kwargs['show_progress'] if 'show_progress' in kwargs else False self.adb_bin = kwargs['adb_bin'] if 'adb_bin' in kwargs else "adb" self.adb_flags = kwargs['adb_flags'] if 'adb_flags' in kwargs else [] self.adb_options = kwargs['adb_options'] if 'adb_options' in kwargs else [] args = Args(**kargs) adb_arguments = [args.adb_bin] + [f"-{arg}" for arg in args.adb_flags] for option, value in args.adb_options.items(): adb_arguments.append(f"-{option}") adb_arguments.append(value) fs_android = AndroidFileSystem(adb_arguments, 'UTF-8') fs_local = LocalFileSystem(adb_arguments) try: fs_android.test_connection() except BrokenPipeError: logging_fatal("Connection test failed") if args.direction == "push": fs_source = fs_local fs_destination = fs_android else: fs_source = fs_android fs_destination = fs_local path_source = args.source path_destination = args.destination path_source, path_destination = FileSyncer.paths_to_fixed_destination_paths(path_source, fs_source, path_destination, fs_destination) path_source = fs_source.normpath(path_source) path_destination = fs_destination.normpath(path_destination) try: files_tree_source = fs_source.get_files_tree(path_source, follow_links = args.copy_links) except (FileNotFoundError, NotADirectoryError, PermissionError) as e: perror(path_source, e, FATAL) try: files_tree_destination = fs_destination.get_files_tree(path_destination, follow_links = args.copy_links) except FileNotFoundError: files_tree_destination = None except (NotADirectoryError, PermissionError) as e: perror(path_destination, e, FATAL) logging.debug("Source tree:") if files_tree_source is not None: log_tree(path_source, files_tree_source, logging_level=logging.DEBUG) logging.debug("") logging.debug("Destination tree:") if files_tree_destination is not None: log_tree(path_destination, files_tree_destination, logging_level=logging.DEBUG) logging.debug("") if isinstance(files_tree_source, dict): excludePatterns = [fs_destination.normpath( fs_destination.join(path_destination, exclude) ) for exclude in args.exclude] else: excludePatterns = [fs_destination.normpath( path_destination + exclude ) for exclude in args.exclude] logging.debug("Exclude patterns:") logging.debug(excludePatterns) logging.debug("") tree_delete, tree_copy, tree_excluded_source, tree_unaccounted_destination, tree_excluded_destination = FileSyncer.diff_trees( files_tree_source, files_tree_destination, path_source, path_destination, excludePatterns, fs_source.join, fs_destination.join, folder_file_overwrite_error = not args.dry_run and not args.force ) tree_delete = FileSyncer.prune_tree(tree_delete) tree_copy = FileSyncer.prune_tree(tree_copy) tree_excluded_source = FileSyncer.prune_tree(tree_excluded_source) tree_unaccounted_destination = FileSyncer.prune_tree(tree_unaccounted_destination) tree_excluded_destination = FileSyncer.prune_tree(tree_excluded_destination) tree_delete = FileSyncer.sort_tree(tree_delete) tree_copy = FileSyncer.sort_tree(tree_copy) tree_excluded_source = FileSyncer.sort_tree(tree_excluded_source) tree_unaccounted_destination = FileSyncer.sort_tree(tree_unaccounted_destination) tree_excluded_destination = FileSyncer.sort_tree(tree_excluded_destination) logging.debug("Delete tree:") if tree_delete is not None: log_tree(path_destination, tree_delete, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") logging.debug("Copy tree:") if tree_copy is not None: log_tree(f"{path_source} --> {path_destination}", tree_copy, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") logging.debug("Source excluded tree:") if tree_excluded_source is not None: log_tree(path_source, tree_excluded_source, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") logging.debug("Destination unaccounted tree:") if tree_unaccounted_destination is not None: log_tree(path_destination, tree_unaccounted_destination, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") logging.debug("Destination excluded tree:") if tree_excluded_destination is not None: log_tree(path_destination, tree_excluded_destination, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") tree_unaccounted_destination_non_excluded = None if tree_unaccounted_destination is not None: tree_unaccounted_destination_non_excluded = FileSyncer.prune_tree( FileSyncer.remove_excluded_folders_from_unaccounted_tree( tree_unaccounted_destination, tree_excluded_destination ) ) logging.debug("Non-excluded-supporting destination unaccounted tree:") if tree_unaccounted_destination_non_excluded is not None: log_tree(path_destination, tree_unaccounted_destination_non_excluded, log_leaves_types = False, logging_level=logging.DEBUG) logging.debug("") logging.debug("SYNCING") logging.debug("") if tree_delete is not None: logging.debug("Deleting delete tree") fs_destination.remove_tree(path_destination, tree_delete, dry_run = args.dry_run) else: logging.debug("Empty delete tree") logging.debug("") if args.delete_excluded and args.delete: if tree_excluded_destination is not None: logging.debug("Deleting destination excluded tree") fs_destination.remove_tree(path_destination, tree_excluded_destination, dry_run = args.dry_run) else: logging.debug("Empty destination excluded tree") logging.debug("") if tree_unaccounted_destination is not None: logging.debug("Deleting destination unaccounted tree") fs_destination.remove_tree(path_destination, tree_unaccounted_destination, dry_run = args.dry_run) else: logging.debug("Empty destination unaccounted tree") logging.debug("") elif args.delete_excluded: if tree_excluded_destination is not None: logging.debug("Deleting destination excluded tree") fs_destination.remove_tree(path_destination, tree_excluded_destination, dry_run = args.dry_run) else: logging.debug("Empty destination excluded tree") logging.debug("") elif args.delete: if tree_unaccounted_destination_non_excluded is not None: logging.debug("Deleting non-excluded-supporting destination unaccounted tree") fs_destination.remove_tree(path_destination, tree_unaccounted_destination_non_excluded, dry_run = args.dry_run) else: logging.debug("Empty non-excluded-supporting destination unaccounted tree") logging.debug("") if tree_copy is not None: logging.debug("Copying copy tree") fs_destination.push_tree_here( path_source, fs_destination.split(path_source)[1] if isinstance(tree_copy, tuple) else ".", tree_copy, path_destination, fs_source, dry_run = args.dry_run, show_progress = args.show_progress ) else: logging.debug("Empty copy tree") logging.debug("") return tree_copy, tree_unaccounted_destination_non_excluded