It works now.

This commit is contained in:
leafee98 2025-03-24 01:04:17 +08:00
parent 5e05b8c96d
commit c091ec2e32
2 changed files with 119 additions and 0 deletions

18
copy-cert.json Normal file
View file

@ -0,0 +1,18 @@
{
"ntfy_endpoint": "ntfy.sh/subscribed_topic",
"certs": [
{
"priv": {
"src": "/path/to/src",
"dst": "/path/to/dst"
},
"cert": {
"src": "/path/to/src",
"dst": "/path/to/dst"
},
"owner": "user",
"group": "group",
"perm": "600"
}
]
}

101
copy-cert.py Normal file
View file

@ -0,0 +1,101 @@
#!/usr/bin/env python3
import json
import logging
import os
import shutil
import sys
import urllib3
from typing import Dict
CONFIG_PATH = '/etc/copy-cert.json'
SCRIPT_NAME = os.path.basename(sys.argv[0])
logger = logging.getLogger(SCRIPT_NAME)
http_pool = urllib3.PoolManager()
class InvalidConfigException(Exception):
pass
class CopyEntry:
def __init__(self, d: Dict):
if 'src' not in d.keys():
raise InvalidConfigException('No src in CopyEntry')
if 'dst' not in d.keys():
raise InvalidConfigException('No src in CopyEntry')
self.src: str = d['src']
self.dst: str = d['dst']
class CertEntry:
def __init__(self, d: Dict):
self.priv: CopyEntry = CopyEntry(d['priv'])
self.cert: CopyEntry = CopyEntry(d['cert'])
self.owner: str = d['owner']
self.group: str = d['group']
self.perm: int = int(d['perm'], base=8)
class Config:
def __init__(self, d: Dict):
self.ntfy_endpoint = d['ntfy_endpoint']
self.certs = []
for cert_d in d['certs']:
self.certs.append(CertEntry(cert_d))
def copyFile(src: str, dst: str):
src_mtime = os.path.getmtime(src)
dst_mtime = os.path.getmtime(dst)
if src_mtime > dst_mtime:
logger.info('Copying file: {} --> {}'.format(src, dst))
shutil.copyfile(src, dst)
else:
logger.info('Skipping file: {} --> {}'.format(src, dst))
def copyCert(entry: CertEntry):
copyFile(entry.priv.src, entry.priv.dst)
copyFile(entry.cert.src, entry.cert.dst)
shutil.chown(entry.priv.dst, entry.owner, entry.group)
shutil.chown(entry.cert.dst, entry.owner, entry.group)
os.chmod(entry.priv.dst, entry.perm)
os.chmod(entry.cert.dst, entry.perm)
def notifyError(cert_idx: int, ntfy_endpoint: str, error_msg: str):
if ('' == ntfy_endpoint):
return
headers = {
"Title": "Failed To Copy Certificate",
"Priority": "high",
"Tag": SCRIPT_NAME,
}
body = 'Error copying cert #{}, Failed Exception: \n\n{}'.format(cert_idx, error_msg)
req = http_pool.request('POST', ntfy_endpoint, headers=headers, body=body)
logger.info('Notifing Error, ntfy response: {}'.format(req.data.decode('utf-8').strip()))
def main():
logging.basicConfig(level=logging.INFO)
with open(CONFIG_PATH, 'r') as f:
config = Config(json.load(f))
logger.info('Copy certificates start!')
for i, cert in enumerate(config.certs):
logger.info('Copying cert #{}'.format(i))
try:
copyCert(cert)
except Exception as e:
notifyError(i, config.ntfy_endpoint, str(e))
logger.info('Copy certificates done!')
if __name__ == '__main__':
main()