- Refactor build script - Add python script to check if package exists in apt repo - Refactor woodpecker-ci config - Update makedeb
		
			
				
	
	
		
			241 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
 | 
						|
import lzma
 | 
						|
import gzip
 | 
						|
import bz2
 | 
						|
import re
 | 
						|
import os
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import argparse
 | 
						|
import copy
 | 
						|
from typing import Tuple, Union
 | 
						|
from urllib import request
 | 
						|
 | 
						|
logger = logging.getLogger("main")
 | 
						|
 | 
						|
class ExceptionDecompress(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
class IllegalFilename(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
def parse_filename(filename: str) -> Tuple[str, str]:
 | 
						|
    pattern = re.compile("(?P<name>.*)_(?P<version>.*).deb")
 | 
						|
    m = pattern.match(filename)
 | 
						|
    if m is None:
 | 
						|
        raise IllegalFilename("{} is not like <name>_<version>.deb format".format(filename))
 | 
						|
 | 
						|
    package_name = m.group("name")
 | 
						|
    package_version = m.group("version")
 | 
						|
 | 
						|
    return package_name, package_version
 | 
						|
 | 
						|
def decompress_file(data: bytes):
 | 
						|
    """Decompress data automatically based on file magic"""
 | 
						|
    MAGIC_XZ = b'\xfd7zXZ'
 | 
						|
    MAGIC_GZ = b'\x1f\x8b'
 | 
						|
    MAGIC_BZ2 = b'BZ'
 | 
						|
 | 
						|
    if data.startswith(MAGIC_XZ):
 | 
						|
        util = lzma
 | 
						|
    elif data.startswith(MAGIC_GZ):
 | 
						|
        util = gzip
 | 
						|
    elif data.startswith(MAGIC_BZ2):
 | 
						|
        util = bz2
 | 
						|
    else:
 | 
						|
        raise ExceptionDecompress("File doesn't match any supported magic")
 | 
						|
    return util.decompress(data)
 | 
						|
 | 
						|
def http_file_exists(url: str):
 | 
						|
    req = request.Request(url, method="HEAD")
 | 
						|
    try:
 | 
						|
        res = request.urlopen(req)
 | 
						|
    except request.HTTPError:
 | 
						|
        return False
 | 
						|
 | 
						|
    return res.status == 200
 | 
						|
 | 
						|
def http_download(url: str) -> bytes:
 | 
						|
    req = request.Request(url)
 | 
						|
    res = request.urlopen(req)
 | 
						|
    return res.read()
 | 
						|
 | 
						|
 | 
						|
def transform_set_to_list_base(x: Union[list, set, dict], reverse: bool):
 | 
						|
    '''
 | 
						|
    Transform all set objects in dict to list objects, or reverse.
 | 
						|
 | 
						|
    For example, transform
 | 
						|
        {'url': {'package': ['v2', 'v1']}}
 | 
						|
    to
 | 
						|
        {'url': {'package': {'v2', 'v1'}}}
 | 
						|
    '''
 | 
						|
    if reverse:
 | 
						|
        if type(x) == list:
 | 
						|
            return set(x)
 | 
						|
    else:
 | 
						|
        if type(x) == set:
 | 
						|
            return list(x)
 | 
						|
 | 
						|
    if type(x) == dict:
 | 
						|
        for k in x:
 | 
						|
            x[k] = transform_set_to_list_base(x[k], reverse)
 | 
						|
        return x
 | 
						|
 | 
						|
    return x
 | 
						|
 | 
						|
# make LSP happy
 | 
						|
def transform_set_to_list(x: dict, reverse: bool) -> dict:
 | 
						|
    return dict(transform_set_to_list_base(x, reverse))
 | 
						|
 | 
						|
 | 
						|
class PackagesCache:
 | 
						|
    """
 | 
						|
    Cache packages index, and provide quick check if a package
 | 
						|
    already exists in that Apt repository
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, apt_base_url: str):
 | 
						|
        """
 | 
						|
        :param apt_base_url: initial self.apt_base_url
 | 
						|
        """
 | 
						|
        apt_base_url = apt_base_url.strip()
 | 
						|
        if apt_base_url.endswith("/"):
 | 
						|
            apt_base_url = apt_base_url.rstrip("/")
 | 
						|
        self.apt_base_url = apt_base_url
 | 
						|
        self.cache = dict()     # { url: { package_name: { version } } }
 | 
						|
 | 
						|
    def clear(self):
 | 
						|
        self.cache = dict()
 | 
						|
 | 
						|
    def load(self, path: str):
 | 
						|
        with open(path, 'r') as f:
 | 
						|
            t: dict = json.load(f)
 | 
						|
        t = transform_set_to_list(t, True)
 | 
						|
        self.cache = t
 | 
						|
 | 
						|
    def dump(self, path: str):
 | 
						|
        t = copy.deepcopy(self.cache)
 | 
						|
        t = transform_set_to_list(t, False)
 | 
						|
        with open(path, 'w') as f:
 | 
						|
            json.dump(t, f)
 | 
						|
 | 
						|
    def _build_index(self, url: str, data: bytes):
 | 
						|
        self._build_index_plain(url, data.decode("utf-8"))
 | 
						|
 | 
						|
    def _build_index_plain(self, url: str, data: str):
 | 
						|
        content = data.split("\n")
 | 
						|
 | 
						|
        d = dict()
 | 
						|
        package_name = None
 | 
						|
        package_version = None
 | 
						|
        for line in content:
 | 
						|
            line = line.strip()
 | 
						|
            if len(line) == 0:
 | 
						|
                if package_name is not None and package_version is not None:
 | 
						|
                    if not package_name in d.keys():
 | 
						|
                        d[package_name] = { package_version }
 | 
						|
                    else:
 | 
						|
                        d[package_name].add(package_version)
 | 
						|
                    logger.debug("add to cache %s: %s", package_name, package_version)
 | 
						|
                package_name = None
 | 
						|
                package_version = None
 | 
						|
 | 
						|
            if line.startswith("Package: "):
 | 
						|
                package_name = re.sub("Package:[ \t]*", "", line).strip()
 | 
						|
            if line.startswith("Version: "):
 | 
						|
                package_version = re.sub("Version:[ \t]*", "", line).strip()
 | 
						|
 | 
						|
        self.cache[url] = d
 | 
						|
 | 
						|
    def _cache_url(self, url: str):
 | 
						|
        # try different compress format first
 | 
						|
        for suffix in [ ".xz", ".gz", ".bz2", "" ]:
 | 
						|
            u = url + suffix
 | 
						|
            if http_file_exists(u):
 | 
						|
                d = http_download(u)
 | 
						|
                if len(suffix) > 0:
 | 
						|
                    self._build_index(url, decompress_file(d))
 | 
						|
                else:
 | 
						|
                    self._build_index(url, d)
 | 
						|
                return
 | 
						|
 | 
						|
        raise Exception("No Packages file and its compressed version was found from {}"
 | 
						|
                        .format(url))
 | 
						|
 | 
						|
    def _is_url_cached(self, url: str):
 | 
						|
        return url in self.cache.keys()
 | 
						|
 | 
						|
    def _construct_url(self, suite: str, component: str, arch: str):
 | 
						|
        return f"{self.apt_base_url}/dists/{suite}/{component}/binary-{arch}/Packages"
 | 
						|
 | 
						|
    def is_exists(self, package_name: str, package_version: str, package_arch: str,
 | 
						|
                 suite: str="stable", component: str="main"):
 | 
						|
        """
 | 
						|
        :param package_name: "Package" value in debian control
 | 
						|
        :param package_version: "Version" value in debian control
 | 
						|
        :param package_arch: "Architecture" value in debian control
 | 
						|
        :param suite: suite or codename, most time "stable" is a symlink to
 | 
						|
                        actual codename and be fine for default
 | 
						|
        :param component: should be "main" in most situations
 | 
						|
        """
 | 
						|
        url = self._construct_url(suite, component, package_arch)
 | 
						|
        if not self._is_url_cached(url):
 | 
						|
            self._cache_url(url)
 | 
						|
 | 
						|
        return package_name in self.cache[url] \
 | 
						|
                and package_version in self.cache[url][package_name]
 | 
						|
 | 
						|
def main():
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
    parser.add_argument("--apt-base",           type=str, required=True,
 | 
						|
                        help="Apt repository's url, shoud contains dists/")
 | 
						|
    parser.add_argument("--cache-file",         type=str, required=False,
 | 
						|
                        help="cache file, will create if not exists")
 | 
						|
    parser.add_argument("--package-name",       type=str, required=False,
 | 
						|
                        help="package name to check")
 | 
						|
    parser.add_argument("--package-version",    type=str, required=False,
 | 
						|
                        help="package version to check")
 | 
						|
    parser.add_argument("--filename",           type=str, required=False,
 | 
						|
                        help="get package name and version from filename")
 | 
						|
    parser.add_argument("--package-arch",       type=str, required=True,
 | 
						|
                        help="package architecture to check")
 | 
						|
    parser.add_argument("--verbose",        type=bool, required=False,
 | 
						|
                        help="show more log")
 | 
						|
    arg = parser.parse_args()
 | 
						|
 | 
						|
    if not ((arg.package_name and arg.package_version) or arg.filename):
 | 
						|
        logging.error("You must either specify --filename or (--package-name and --package-version)")
 | 
						|
        exit(1)
 | 
						|
 | 
						|
    log_level=logging.INFO
 | 
						|
    if arg.verbose:
 | 
						|
        log_level=logging.DEBUG
 | 
						|
    logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 | 
						|
 | 
						|
 | 
						|
    p = PackagesCache(arg.apt_base)
 | 
						|
 | 
						|
    if arg.cache_file and os.path.exists(arg.cache_file):
 | 
						|
        p.load(arg.cache_file)
 | 
						|
 | 
						|
    if arg.filename:
 | 
						|
        package_name, package_version = parse_filename(arg.filename)
 | 
						|
    else:
 | 
						|
        package_name = arg.package_name
 | 
						|
        package_version = arg.package_version
 | 
						|
 | 
						|
    ext=4
 | 
						|
    if p.is_exists(package_name, package_version, arg.package_arch):
 | 
						|
        ext=0
 | 
						|
 | 
						|
    if arg.cache_file:
 | 
						|
        p.dump(arg.cache_file)
 | 
						|
 | 
						|
    exit(ext)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |