debian-packages/script/package_exists.py

241 lines
7.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import lzma
import gzip
import bz2
import re
import os
import json
import logging
import argparse
import copy
from typing import Tuple, Union
from urllib import request
logger = logging.getLogger("main")
class ExceptionDecompress(Exception):
pass
class IllegalFilename(Exception):
pass
def parse_filename(filename: str) -> Tuple[str, str]:
pattern = re.compile("(?P<name>.*)_(?P<version>.*).deb")
m = pattern.match(filename)
if m is None:
raise IllegalFilename("{} is not like <name>_<version>.deb format".format(filename))
package_name = m.group("name")
package_version = m.group("version")
return package_name, package_version
def decompress_file(data: bytes):
"""Decompress data automatically based on file magic"""
MAGIC_XZ = b'\xfd7zXZ'
MAGIC_GZ = b'\x1f\x8b'
MAGIC_BZ2 = b'BZ'
if data.startswith(MAGIC_XZ):
util = lzma
elif data.startswith(MAGIC_GZ):
util = gzip
elif data.startswith(MAGIC_BZ2):
util = bz2
else:
raise ExceptionDecompress("File doesn't match any supported magic")
return util.decompress(data)
def http_file_exists(url: str):
req = request.Request(url, method="HEAD")
try:
res = request.urlopen(req)
except request.HTTPError:
return False
return res.status == 200
def http_download(url: str) -> bytes:
req = request.Request(url)
res = request.urlopen(req)
return res.read()
def transform_set_to_list_base(x: Union[list, set, dict], reverse: bool):
'''
Transform all set objects in dict to list objects, or reverse.
For example, transform
{'url': {'package': ['v2', 'v1']}}
to
{'url': {'package': {'v2', 'v1'}}}
'''
if reverse:
if type(x) == list:
return set(x)
else:
if type(x) == set:
return list(x)
if type(x) == dict:
for k in x:
x[k] = transform_set_to_list_base(x[k], reverse)
return x
return x
# make LSP happy
def transform_set_to_list(x: dict, reverse: bool) -> dict:
return dict(transform_set_to_list_base(x, reverse))
class PackagesCache:
"""
Cache packages index, and provide quick check if a package
already exists in that Apt repository
"""
def __init__(self, apt_base_url: str):
"""
:param apt_base_url: initial self.apt_base_url
"""
apt_base_url = apt_base_url.strip()
if apt_base_url.endswith("/"):
apt_base_url = apt_base_url.rstrip("/")
self.apt_base_url = apt_base_url
self.cache = dict() # { url: { package_name: { version } } }
def clear(self):
self.cache = dict()
def load(self, path: str):
with open(path, 'r') as f:
t: dict = json.load(f)
t = transform_set_to_list(t, True)
self.cache = t
def dump(self, path: str):
t = copy.deepcopy(self.cache)
t = transform_set_to_list(t, False)
with open(path, 'w') as f:
json.dump(t, f)
def _build_index(self, url: str, data: bytes):
self._build_index_plain(url, data.decode("utf-8"))
def _build_index_plain(self, url: str, data: str):
content = data.split("\n")
d = dict()
package_name = None
package_version = None
for line in content:
line = line.strip()
if len(line) == 0:
if package_name is not None and package_version is not None:
if not package_name in d.keys():
d[package_name] = { package_version }
else:
d[package_name].add(package_version)
logger.debug("add to cache %s: %s", package_name, package_version)
package_name = None
package_version = None
if line.startswith("Package: "):
package_name = re.sub("Package:[ \t]*", "", line).strip()
if line.startswith("Version: "):
package_version = re.sub("Version:[ \t]*", "", line).strip()
self.cache[url] = d
def _cache_url(self, url: str):
# try different compress format first
for suffix in [ ".xz", ".gz", ".bz2", "" ]:
u = url + suffix
if http_file_exists(u):
d = http_download(u)
if len(suffix) > 0:
self._build_index(url, decompress_file(d))
else:
self._build_index(url, d)
return
raise Exception("No Packages file and its compressed version was found from {}"
.format(url))
def _is_url_cached(self, url: str):
return url in self.cache.keys()
def _construct_url(self, suite: str, component: str, arch: str):
return f"{self.apt_base_url}/dists/{suite}/{component}/binary-{arch}/Packages"
def is_exists(self, package_name: str, package_version: str, package_arch: str,
suite: str="stable", component: str="main"):
"""
:param package_name: "Package" value in debian control
:param package_version: "Version" value in debian control
:param package_arch: "Architecture" value in debian control
:param suite: suite or codename, most time "stable" is a symlink to
actual codename and be fine for default
:param component: should be "main" in most situations
"""
url = self._construct_url(suite, component, package_arch)
if not self._is_url_cached(url):
self._cache_url(url)
return package_name in self.cache[url] \
and package_version in self.cache[url][package_name]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apt-base", type=str, required=True,
help="Apt repository's url, shoud contains dists/")
parser.add_argument("--cache-file", type=str, required=False,
help="cache file, will create if not exists")
parser.add_argument("--package-name", type=str, required=False,
help="package name to check")
parser.add_argument("--package-version", type=str, required=False,
help="package version to check")
parser.add_argument("--filename", type=str, required=False,
help="get package name and version from filename")
parser.add_argument("--package-arch", type=str, required=True,
help="package architecture to check")
parser.add_argument("--verbose", type=bool, required=False,
help="show more log")
arg = parser.parse_args()
if not ((arg.package_name and arg.package_version) or arg.filename):
logging.error("You must either specify --filename or (--package-name and --package-version)")
exit(1)
log_level=logging.INFO
if arg.verbose:
log_level=logging.DEBUG
logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
p = PackagesCache(arg.apt_base)
if arg.cache_file and os.path.exists(arg.cache_file):
p.load(arg.cache_file)
if arg.filename:
package_name, package_version = parse_filename(arg.filename)
else:
package_name = arg.package_name
package_version = arg.package_version
ext=4
if p.is_exists(package_name, package_version, arg.package_arch):
ext=0
if arg.cache_file:
p.dump(arg.cache_file)
exit(ext)
if __name__ == "__main__":
main()