copy-cert.py/copy-cert.py
2025-06-02 23:20:52 +08:00

117 lines
3.3 KiB
Python

#!/usr/bin/env python3
import json
import logging
import os
import shutil
import subprocess
import sys
import urllib3
from typing import Dict
CONFIG_PATH = '/etc/copy-cert.json'
SCRIPT_NAME = os.path.basename(sys.argv[0])
logger = logging.getLogger(SCRIPT_NAME)
http_pool = urllib3.PoolManager()
class InvalidConfigException(Exception):
pass
class FileNotExistsException(Exception):
pass
class CopyEntry:
def __init__(self, d: Dict):
if 'src' not in d.keys():
raise InvalidConfigException('No src in CopyEntry')
if 'dst' not in d.keys():
raise InvalidConfigException('No src in CopyEntry')
self.src: str = d['src']
self.dst: str = d['dst']
class CertEntry:
def __init__(self, d: Dict):
self.priv: CopyEntry = CopyEntry(d['priv'])
self.cert: CopyEntry = CopyEntry(d['cert'])
self.owner: str = d['owner']
self.group: str = d['group']
self.perm: int = int(d['perm'], base=8)
self.service: str = d['service']
class Config:
def __init__(self, d: Dict):
self.ntfy_endpoint = d['ntfy_endpoint']
self.certs = []
for cert_d in d['certs']:
self.certs.append(CertEntry(cert_d))
def copyFile(src: str, dst: str):
if not os.path.exists(src):
raise FileNotExistsException('Src file not exists: {}'.format(src))
if not os.path.exists(os.path.dirname(dst)):
raise FileNotExistsException('Direcotyr of dst not exists: {}'.format(dst))
is_dst_exists = os.path.exists(dst)
src_mtime = os.path.getmtime(src)
dst_mtime = os.path.getmtime(dst) if is_dst_exists else 0
if is_dst_exists and src_mtime < dst_mtime:
logger.info('Skipping file: {} --> {}'.format(src, dst))
else:
logger.info('Copying file: {} --> {}'.format(src, dst))
shutil.copyfile(src, dst)
def copyCert(entry: CertEntry):
copyFile(entry.priv.src, entry.priv.dst)
copyFile(entry.cert.src, entry.cert.dst)
shutil.chown(entry.priv.dst, entry.owner, entry.group)
shutil.chown(entry.cert.dst, entry.owner, entry.group)
os.chmod(entry.priv.dst, entry.perm)
os.chmod(entry.cert.dst, entry.perm)
if entry.service is not None and entry.service != '':
subprocess.run(['systemctl', 'reload-or-restart', entry.service])
def notifyError(cert_idx: int, ntfy_endpoint: str, error_msg: str):
if ('' == ntfy_endpoint):
return
headers = {
"Title": "Failed To Copy Certificate",
"Priority": "high",
"Tag": SCRIPT_NAME,
}
body = 'Error copying cert #{}, Failed Exception: \n\n{}'.format(cert_idx, error_msg)
req = http_pool.request('POST', ntfy_endpoint, headers=headers, body=body)
logger.info('Notifing Error, ntfy response: {}'.format(req.data.decode('utf-8').strip()))
def main():
logging.basicConfig(level=logging.INFO)
with open(CONFIG_PATH, 'r') as f:
config = Config(json.load(f))
logger.info('Copy certificates start!')
for i, cert in enumerate(config.certs):
logger.info('Copying cert #{}'.format(i))
try:
copyCert(cert)
except Exception as e:
notifyError(i, config.ntfy_endpoint, str(e))
logger.info('Copy certificates done!')
if __name__ == '__main__':
main()