241 lines
7.5 KiB
Python
241 lines
7.5 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import lzma
|
||
|
import gzip
|
||
|
import bz2
|
||
|
import re
|
||
|
import os
|
||
|
import json
|
||
|
import logging
|
||
|
import argparse
|
||
|
import copy
|
||
|
from typing import Tuple, Union
|
||
|
from urllib import request
|
||
|
|
||
|
logger = logging.getLogger("main")
|
||
|
|
||
|
class ExceptionDecompress(Exception):
|
||
|
pass
|
||
|
|
||
|
class IllegalFilename(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def parse_filename(filename: str) -> Tuple[str, str]:
|
||
|
pattern = re.compile("(?P<name>.*)_(?P<version>.*).deb")
|
||
|
m = pattern.match(filename)
|
||
|
if m is None:
|
||
|
raise IllegalFilename("{} is not like <name>_<version>.deb format".format(filename))
|
||
|
|
||
|
package_name = m.group("name")
|
||
|
package_version = m.group("version")
|
||
|
|
||
|
return package_name, package_version
|
||
|
|
||
|
def decompress_file(data: bytes):
|
||
|
"""Decompress data automatically based on file magic"""
|
||
|
MAGIC_XZ = b'\xfd7zXZ'
|
||
|
MAGIC_GZ = b'\x1f\x8b'
|
||
|
MAGIC_BZ2 = b'BZ'
|
||
|
|
||
|
if data.startswith(MAGIC_XZ):
|
||
|
util = lzma
|
||
|
elif data.startswith(MAGIC_GZ):
|
||
|
util = gzip
|
||
|
elif data.startswith(MAGIC_BZ2):
|
||
|
util = bz2
|
||
|
else:
|
||
|
raise ExceptionDecompress("File doesn't match any supported magic")
|
||
|
return util.decompress(data)
|
||
|
|
||
|
def http_file_exists(url: str):
|
||
|
req = request.Request(url, method="HEAD")
|
||
|
try:
|
||
|
res = request.urlopen(req)
|
||
|
except request.HTTPError:
|
||
|
return False
|
||
|
|
||
|
return res.status == 200
|
||
|
|
||
|
def http_download(url: str) -> bytes:
|
||
|
req = request.Request(url)
|
||
|
res = request.urlopen(req)
|
||
|
return res.read()
|
||
|
|
||
|
|
||
|
def transform_set_to_list_base(x: Union[list, set, dict], reverse: bool):
|
||
|
'''
|
||
|
Transform all set objects in dict to list objects, or reverse.
|
||
|
|
||
|
For example, transform
|
||
|
{'url': {'package': ['v2', 'v1']}}
|
||
|
to
|
||
|
{'url': {'package': {'v2', 'v1'}}}
|
||
|
'''
|
||
|
if reverse:
|
||
|
if type(x) == list:
|
||
|
return set(x)
|
||
|
else:
|
||
|
if type(x) == set:
|
||
|
return list(x)
|
||
|
|
||
|
if type(x) == dict:
|
||
|
for k in x:
|
||
|
x[k] = transform_set_to_list_base(x[k], reverse)
|
||
|
return x
|
||
|
|
||
|
return x
|
||
|
|
||
|
# make LSP happy
|
||
|
def transform_set_to_list(x: dict, reverse: bool) -> dict:
|
||
|
return dict(transform_set_to_list_base(x, reverse))
|
||
|
|
||
|
|
||
|
class PackagesCache:
|
||
|
"""
|
||
|
Cache packages index, and provide quick check if a package
|
||
|
already exists in that Apt repository
|
||
|
"""
|
||
|
|
||
|
def __init__(self, apt_base_url: str):
|
||
|
"""
|
||
|
:param apt_base_url: initial self.apt_base_url
|
||
|
"""
|
||
|
apt_base_url = apt_base_url.strip()
|
||
|
if apt_base_url.endswith("/"):
|
||
|
apt_base_url = apt_base_url.rstrip("/")
|
||
|
self.apt_base_url = apt_base_url
|
||
|
self.cache = dict() # { url: { package_name: { version } } }
|
||
|
|
||
|
def clear(self):
|
||
|
self.cache = dict()
|
||
|
|
||
|
def load(self, path: str):
|
||
|
with open(path, 'r') as f:
|
||
|
t: dict = json.load(f)
|
||
|
t = transform_set_to_list(t, True)
|
||
|
self.cache = t
|
||
|
|
||
|
def dump(self, path: str):
|
||
|
t = copy.deepcopy(self.cache)
|
||
|
t = transform_set_to_list(t, False)
|
||
|
with open(path, 'w') as f:
|
||
|
json.dump(t, f)
|
||
|
|
||
|
def _build_index(self, url: str, data: bytes):
|
||
|
self._build_index_plain(url, data.decode("utf-8"))
|
||
|
|
||
|
def _build_index_plain(self, url: str, data: str):
|
||
|
content = data.split("\n")
|
||
|
|
||
|
d = dict()
|
||
|
package_name = None
|
||
|
package_version = None
|
||
|
for line in content:
|
||
|
line = line.strip()
|
||
|
if len(line) == 0:
|
||
|
if package_name is not None and package_version is not None:
|
||
|
if not package_name in d.keys():
|
||
|
d[package_name] = { package_version }
|
||
|
else:
|
||
|
d[package_name].add(package_version)
|
||
|
logger.debug("add to cache %s: %s", package_name, package_version)
|
||
|
package_name = None
|
||
|
package_version = None
|
||
|
|
||
|
if line.startswith("Package: "):
|
||
|
package_name = re.sub("Package:[ \t]*", "", line).strip()
|
||
|
if line.startswith("Version: "):
|
||
|
package_version = re.sub("Version:[ \t]*", "", line).strip()
|
||
|
|
||
|
self.cache[url] = d
|
||
|
|
||
|
def _cache_url(self, url: str):
|
||
|
# try different compress format first
|
||
|
for suffix in [ ".xz", ".gz", ".bz2", "" ]:
|
||
|
u = url + suffix
|
||
|
if http_file_exists(u):
|
||
|
d = http_download(u)
|
||
|
if len(suffix) > 0:
|
||
|
self._build_index(url, decompress_file(d))
|
||
|
else:
|
||
|
self._build_index(url, d)
|
||
|
return
|
||
|
|
||
|
raise Exception("No Packages file and its compressed version was found from {}"
|
||
|
.format(url))
|
||
|
|
||
|
def _is_url_cached(self, url: str):
|
||
|
return url in self.cache.keys()
|
||
|
|
||
|
def _construct_url(self, suite: str, component: str, arch: str):
|
||
|
return f"{self.apt_base_url}/dists/{suite}/{component}/binary-{arch}/Packages"
|
||
|
|
||
|
def is_exists(self, package_name: str, package_version: str, package_arch: str,
|
||
|
suite: str="stable", component: str="main"):
|
||
|
"""
|
||
|
:param package_name: "Package" value in debian control
|
||
|
:param package_version: "Version" value in debian control
|
||
|
:param package_arch: "Architecture" value in debian control
|
||
|
:param suite: suite or codename, most time "stable" is a symlink to
|
||
|
actual codename and be fine for default
|
||
|
:param component: should be "main" in most situations
|
||
|
"""
|
||
|
url = self._construct_url(suite, component, package_arch)
|
||
|
if not self._is_url_cached(url):
|
||
|
self._cache_url(url)
|
||
|
|
||
|
return package_name in self.cache[url] \
|
||
|
and package_version in self.cache[url][package_name]
|
||
|
|
||
|
def main():
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument("--apt-base", type=str, required=True,
|
||
|
help="Apt repository's url, shoud contains dists/")
|
||
|
parser.add_argument("--cache-file", type=str, required=False,
|
||
|
help="cache file, will create if not exists")
|
||
|
parser.add_argument("--package-name", type=str, required=False,
|
||
|
help="package name to check")
|
||
|
parser.add_argument("--package-version", type=str, required=False,
|
||
|
help="package version to check")
|
||
|
parser.add_argument("--filename", type=str, required=False,
|
||
|
help="get package name and version from filename")
|
||
|
parser.add_argument("--package-arch", type=str, required=True,
|
||
|
help="package architecture to check")
|
||
|
parser.add_argument("--verbose", type=bool, required=False,
|
||
|
help="show more log")
|
||
|
arg = parser.parse_args()
|
||
|
|
||
|
if not ((arg.package_name and arg.package_version) or arg.filename):
|
||
|
logging.error("You must either specify --filename or (--package-name and --package-version)")
|
||
|
exit(1)
|
||
|
|
||
|
log_level=logging.INFO
|
||
|
if arg.verbose:
|
||
|
log_level=logging.DEBUG
|
||
|
logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||
|
|
||
|
|
||
|
p = PackagesCache(arg.apt_base)
|
||
|
|
||
|
if arg.cache_file and os.path.exists(arg.cache_file):
|
||
|
p.load(arg.cache_file)
|
||
|
|
||
|
if arg.filename:
|
||
|
package_name, package_version = parse_filename(arg.filename)
|
||
|
else:
|
||
|
package_name = arg.package_name
|
||
|
package_version = arg.package_version
|
||
|
|
||
|
ext=4
|
||
|
if p.is_exists(package_name, package_version, arg.package_arch):
|
||
|
ext=0
|
||
|
|
||
|
if arg.cache_file:
|
||
|
p.dump(arg.cache_file)
|
||
|
|
||
|
exit(ext)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|