117 lines
3.3 KiB
Python
117 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import urllib3
|
|
|
|
from typing import Dict
|
|
|
|
CONFIG_PATH = '/etc/copy-cert.json'
|
|
SCRIPT_NAME = os.path.basename(sys.argv[0])
|
|
|
|
|
|
logger = logging.getLogger(SCRIPT_NAME)
|
|
http_pool = urllib3.PoolManager()
|
|
|
|
class InvalidConfigException(Exception):
|
|
pass
|
|
|
|
class FileNotExistsException(Exception):
|
|
pass
|
|
|
|
|
|
class CopyEntry:
|
|
def __init__(self, d: Dict):
|
|
if 'src' not in d.keys():
|
|
raise InvalidConfigException('No src in CopyEntry')
|
|
if 'dst' not in d.keys():
|
|
raise InvalidConfigException('No src in CopyEntry')
|
|
|
|
self.src: str = d['src']
|
|
self.dst: str = d['dst']
|
|
|
|
class CertEntry:
|
|
def __init__(self, d: Dict):
|
|
self.priv: CopyEntry = CopyEntry(d['priv'])
|
|
self.cert: CopyEntry = CopyEntry(d['cert'])
|
|
self.owner: str = d['owner']
|
|
self.group: str = d['group']
|
|
self.perm: int = int(d['perm'], base=8)
|
|
self.service: str = d['service']
|
|
|
|
class Config:
|
|
def __init__(self, d: Dict):
|
|
self.ntfy_endpoint = d['ntfy_endpoint']
|
|
self.certs = []
|
|
for cert_d in d['certs']:
|
|
self.certs.append(CertEntry(cert_d))
|
|
|
|
def copyFile(src: str, dst: str):
|
|
if not os.path.exists(src):
|
|
raise FileNotExistsException('Src file not exists: {}'.format(src))
|
|
|
|
if not os.path.exists(os.path.dirname(dst)):
|
|
raise FileNotExistsException('Direcotyr of dst not exists: {}'.format(dst))
|
|
|
|
is_dst_exists = os.path.exists(dst)
|
|
src_mtime = os.path.getmtime(src)
|
|
dst_mtime = os.path.getmtime(dst) if is_dst_exists else 0
|
|
|
|
if is_dst_exists and src_mtime < dst_mtime:
|
|
logger.info('Skipping file: {} --> {}'.format(src, dst))
|
|
else:
|
|
logger.info('Copying file: {} --> {}'.format(src, dst))
|
|
shutil.copyfile(src, dst)
|
|
|
|
|
|
def copyCert(entry: CertEntry):
|
|
copyFile(entry.priv.src, entry.priv.dst)
|
|
copyFile(entry.cert.src, entry.cert.dst)
|
|
|
|
shutil.chown(entry.priv.dst, entry.owner, entry.group)
|
|
shutil.chown(entry.cert.dst, entry.owner, entry.group)
|
|
|
|
os.chmod(entry.priv.dst, entry.perm)
|
|
os.chmod(entry.cert.dst, entry.perm)
|
|
|
|
if entry.service is not None and entry.service != '':
|
|
subprocess.run(['systemctl', 'reload-or-restart', entry.service])
|
|
|
|
def notifyError(cert_idx: int, ntfy_endpoint: str, error_msg: str):
|
|
if ('' == ntfy_endpoint):
|
|
return
|
|
|
|
headers = {
|
|
"Title": "Failed To Copy Certificate",
|
|
"Priority": "high",
|
|
"Tag": SCRIPT_NAME,
|
|
}
|
|
body = 'Error copying cert #{}, Failed Exception: \n\n{}'.format(cert_idx, error_msg)
|
|
|
|
req = http_pool.request('POST', ntfy_endpoint, headers=headers, body=body)
|
|
logger.info('Notifing Error, ntfy response: {}'.format(req.data.decode('utf-8').strip()))
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
with open(CONFIG_PATH, 'r') as f:
|
|
config = Config(json.load(f))
|
|
|
|
logger.info('Copy certificates start!')
|
|
|
|
for i, cert in enumerate(config.certs):
|
|
logger.info('Copying cert #{}'.format(i))
|
|
try:
|
|
copyCert(cert)
|
|
except Exception as e:
|
|
notifyError(i, config.ntfy_endpoint, str(e))
|
|
|
|
logger.info('Copy certificates done!')
|
|
|
|
if __name__ == '__main__':
|
|
main()
|