#!/usr/bin/env python3 import lzma import gzip import bz2 import re import os import json import logging import argparse import copy from typing import Tuple, Union from urllib import request logger = logging.getLogger("main") class ExceptionDecompress(Exception): pass class IllegalFilename(Exception): pass def parse_filename(filename: str) -> Tuple[str, str]: pattern = re.compile("(?P.*)_(?P.*).deb") m = pattern.match(filename) if m is None: raise IllegalFilename("{} is not like _.deb format".format(filename)) package_name = m.group("name") package_version = m.group("version") return package_name, package_version def decompress_file(data: bytes): """Decompress data automatically based on file magic""" MAGIC_XZ = b'\xfd7zXZ' MAGIC_GZ = b'\x1f\x8b' MAGIC_BZ2 = b'BZ' if data.startswith(MAGIC_XZ): util = lzma elif data.startswith(MAGIC_GZ): util = gzip elif data.startswith(MAGIC_BZ2): util = bz2 else: raise ExceptionDecompress("File doesn't match any supported magic") return util.decompress(data) def http_file_exists(url: str): req = request.Request(url, method="HEAD") try: res = request.urlopen(req) except request.HTTPError: return False return res.status == 200 def http_download(url: str) -> bytes: req = request.Request(url) res = request.urlopen(req) return res.read() def transform_set_to_list_base(x: Union[list, set, dict], reverse: bool): ''' Transform all set objects in dict to list objects, or reverse. For example, transform {'url': {'package': ['v2', 'v1']}} to {'url': {'package': {'v2', 'v1'}}} ''' if reverse: if type(x) == list: return set(x) else: if type(x) == set: return list(x) if type(x) == dict: for k in x: x[k] = transform_set_to_list_base(x[k], reverse) return x return x # make LSP happy def transform_set_to_list(x: dict, reverse: bool) -> dict: return dict(transform_set_to_list_base(x, reverse)) class PackagesCache: """ Cache packages index, and provide quick check if a package already exists in that Apt repository """ def __init__(self, apt_base_url: str): """ :param apt_base_url: initial self.apt_base_url """ apt_base_url = apt_base_url.strip() if apt_base_url.endswith("/"): apt_base_url = apt_base_url.rstrip("/") self.apt_base_url = apt_base_url self.cache = dict() # { url: { package_name: { version } } } def clear(self): self.cache = dict() def load(self, path: str): with open(path, 'r') as f: t: dict = json.load(f) t = transform_set_to_list(t, True) self.cache = t def dump(self, path: str): t = copy.deepcopy(self.cache) t = transform_set_to_list(t, False) with open(path, 'w') as f: json.dump(t, f) def _build_index(self, url: str, data: bytes): self._build_index_plain(url, data.decode("utf-8")) def _build_index_plain(self, url: str, data: str): content = data.split("\n") d = dict() package_name = None package_version = None for line in content: line = line.strip() if len(line) == 0: if package_name is not None and package_version is not None: if not package_name in d.keys(): d[package_name] = { package_version } else: d[package_name].add(package_version) logger.debug("add to cache %s: %s", package_name, package_version) package_name = None package_version = None if line.startswith("Package: "): package_name = re.sub("Package:[ \t]*", "", line).strip() if line.startswith("Version: "): package_version = re.sub("Version:[ \t]*", "", line).strip() self.cache[url] = d def _cache_url(self, url: str): # try different compress format first for suffix in [ ".xz", ".gz", ".bz2", "" ]: u = url + suffix if http_file_exists(u): d = http_download(u) if len(suffix) > 0: self._build_index(url, decompress_file(d)) else: self._build_index(url, d) return raise Exception("No Packages file and its compressed version was found from {}" .format(url)) def _is_url_cached(self, url: str): return url in self.cache.keys() def _construct_url(self, suite: str, component: str, arch: str): return f"{self.apt_base_url}/dists/{suite}/{component}/binary-{arch}/Packages" def is_exists(self, package_name: str, package_version: str, package_arch: str, suite: str="stable", component: str="main"): """ :param package_name: "Package" value in debian control :param package_version: "Version" value in debian control :param package_arch: "Architecture" value in debian control :param suite: suite or codename, most time "stable" is a symlink to actual codename and be fine for default :param component: should be "main" in most situations """ url = self._construct_url(suite, component, package_arch) if not self._is_url_cached(url): self._cache_url(url) return package_name in self.cache[url] \ and package_version in self.cache[url][package_name] def main(): parser = argparse.ArgumentParser() parser.add_argument("--apt-base", type=str, required=True, help="Apt repository's url, shoud contains dists/") parser.add_argument("--cache-file", type=str, required=False, help="cache file, will create if not exists") parser.add_argument("--package-name", type=str, required=False, help="package name to check") parser.add_argument("--package-version", type=str, required=False, help="package version to check") parser.add_argument("--filename", type=str, required=False, help="get package name and version from filename") parser.add_argument("--package-arch", type=str, required=True, help="package architecture to check") parser.add_argument("--verbose", type=bool, required=False, help="show more log") arg = parser.parse_args() if not ((arg.package_name and arg.package_version) or arg.filename): logging.error("You must either specify --filename or (--package-name and --package-version)") exit(1) log_level=logging.INFO if arg.verbose: log_level=logging.DEBUG logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") p = PackagesCache(arg.apt_base) if arg.cache_file and os.path.exists(arg.cache_file): p.load(arg.cache_file) if arg.filename: package_name, package_version = parse_filename(arg.filename) else: package_name = arg.package_name package_version = arg.package_version ext=4 if p.is_exists(package_name, package_version, arg.package_arch): ext=0 if arg.cache_file: p.dump(arg.cache_file) exit(ext) if __name__ == "__main__": main()