diff --git a/copy-cert.json b/copy-cert.json new file mode 100644 index 0000000..d6f17c1 --- /dev/null +++ b/copy-cert.json @@ -0,0 +1,18 @@ +{ + "ntfy_endpoint": "ntfy.sh/subscribed_topic", + "certs": [ + { + "priv": { + "src": "/path/to/src", + "dst": "/path/to/dst" + }, + "cert": { + "src": "/path/to/src", + "dst": "/path/to/dst" + }, + "owner": "user", + "group": "group", + "perm": "600" + } + ] +} diff --git a/copy-cert.py b/copy-cert.py new file mode 100644 index 0000000..2477309 --- /dev/null +++ b/copy-cert.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +import json +import logging +import os +import shutil +import sys +import urllib3 + +from typing import Dict + +CONFIG_PATH = '/etc/copy-cert.json' +SCRIPT_NAME = os.path.basename(sys.argv[0]) + + +logger = logging.getLogger(SCRIPT_NAME) +http_pool = urllib3.PoolManager() + +class InvalidConfigException(Exception): + pass + + +class CopyEntry: + def __init__(self, d: Dict): + if 'src' not in d.keys(): + raise InvalidConfigException('No src in CopyEntry') + if 'dst' not in d.keys(): + raise InvalidConfigException('No src in CopyEntry') + + self.src: str = d['src'] + self.dst: str = d['dst'] + +class CertEntry: + def __init__(self, d: Dict): + self.priv: CopyEntry = CopyEntry(d['priv']) + self.cert: CopyEntry = CopyEntry(d['cert']) + self.owner: str = d['owner'] + self.group: str = d['group'] + self.perm: int = int(d['perm'], base=8) + +class Config: + def __init__(self, d: Dict): + self.ntfy_endpoint = d['ntfy_endpoint'] + self.certs = [] + for cert_d in d['certs']: + self.certs.append(CertEntry(cert_d)) + +def copyFile(src: str, dst: str): + src_mtime = os.path.getmtime(src) + dst_mtime = os.path.getmtime(dst) + + if src_mtime > dst_mtime: + logger.info('Copying file: {} --> {}'.format(src, dst)) + shutil.copyfile(src, dst) + else: + logger.info('Skipping file: {} --> {}'.format(src, dst)) + + +def copyCert(entry: CertEntry): + copyFile(entry.priv.src, entry.priv.dst) + copyFile(entry.cert.src, entry.cert.dst) + + shutil.chown(entry.priv.dst, entry.owner, entry.group) + shutil.chown(entry.cert.dst, entry.owner, entry.group) + + os.chmod(entry.priv.dst, entry.perm) + os.chmod(entry.cert.dst, entry.perm) + +def notifyError(cert_idx: int, ntfy_endpoint: str, error_msg: str): + if ('' == ntfy_endpoint): + return + + headers = { + "Title": "Failed To Copy Certificate", + "Priority": "high", + "Tag": SCRIPT_NAME, + } + body = 'Error copying cert #{}, Failed Exception: \n\n{}'.format(cert_idx, error_msg) + + req = http_pool.request('POST', ntfy_endpoint, headers=headers, body=body) + logger.info('Notifing Error, ntfy response: {}'.format(req.data.decode('utf-8').strip())) + +def main(): + logging.basicConfig(level=logging.INFO) + + with open(CONFIG_PATH, 'r') as f: + config = Config(json.load(f)) + + logger.info('Copy certificates start!') + + for i, cert in enumerate(config.certs): + logger.info('Copying cert #{}'.format(i)) + try: + copyCert(cert) + except Exception as e: + notifyError(i, config.ntfy_endpoint, str(e)) + + logger.info('Copy certificates done!') + +if __name__ == '__main__': + main()