This repository has been archived on 2022-01-23. You can view files and clone it, but cannot push or open issues or pull requests.
relchk/_base.py

213 lines
8.1 KiB
Python

import hashlib
import json
import os
import pathlib
import shutil
import subprocess
import tempfile
##
import psutil
import requests
class BaseUpdater(object):
_tpl_dir = os.path.join(os.path.dirname(os.path.abspath(os.path.expanduser(__file__))), 'tpl')
_tpl_file = None
_date_fmt = '%a, %d %b %Y %H:%M:%S %z'
_tpl_vars = {}
arch = None
variant = None
def __init__(self,
dest_dir,
dest_file,
ver_file,
lock_path,
do_grub_cfg,
boot_dir,
grub_cfg,
hash_type
# check_gpg = True, # TODO: GPG sig checking
):
self.dest_dir = os.path.abspath(os.path.expanduser(dest_dir))
self.dest_file = dest_file
self.ver_file = ver_file
self.do_grub = bool(do_grub_cfg)
self.boot_dir = os.path.abspath(os.path.expanduser(boot_dir))
self.grub_cfg = os.path.abspath(os.path.expanduser(grub_cfg))
self.lckfile = os.path.abspath(os.path.expanduser(lock_path))
p_dest = pathlib.Path(self.dest_dir)
p_boot = pathlib.Path(self.boot_dir)
self.grub_iso_dir = '/{0}'.format(str(p_dest.relative_to(p_boot)))
self.old_date = None
self.old_ver = None
self.old_hash = None
self.new_date = None
self.new_ver = None
self.new_hash = None
self.do_update = False
self.force_update = False
self.iso_url = None
self.boot_uuid = None
self.hash_type = hash_type
self.dest_iso = os.path.join(self.dest_dir, self.dest_file)
self.dest_ver = os.path.join(self.dest_dir, self.ver_file)
def _init_tplvars(self):
self.getUUID()
self._tpl_vars['iso_path'] = os.path.abspath(
os.path.expanduser(
os.path.join(self.grub_iso_dir,
self.dest_file))).lstrip('/')
self._tpl_vars['disk_uuid'] = self.boot_uuid
self._tpl_vars['version'] = self.new_ver
self._tpl_vars['arch'] = self.arch
self._tpl_vars['ver_str'] = str(self.new_ver).replace('.', '')
self._tpl_vars['variant'] = self.variant
return(None)
def main(self):
if self.getRunning():
return(None)
self.lock()
self.getCurVer()
self.getNewVer()
if self.do_update or \
self.force_update or not \
all((self.old_date,
self.old_ver,
self.old_hash)):
self.do_update = True
self.download()
if self.do_grub:
self._init_tplvars()
self.grub()
self.touchVer()
self.unlock()
return(None)
def download(self):
if self.getRunning():
return(None)
if not any((self.do_update, self.force_update)):
return(None)
if not self.iso_url:
raise RuntimeError('iso_url attribute must be set first')
req_chk = requests.head(self.iso_url, headers = {'User-Agent': 'curl/7.74.0'})
if not req_chk.ok:
raise RuntimeError('Received non-200/30x {0} for {1}'.format(req_chk.status_code, self.iso_url))
_tmpfile = tempfile.mkstemp()[1]
with requests.get(self.iso_url, stream = True, headers = {'User-Agent': 'curl/7.74.0'}) as req:
req.raise_for_status()
with open(_tmpfile, 'wb') as fh:
for chunk in req.iter_content(chunk_size = 8192):
fh.write(chunk)
realhash = self.getISOHash(_tmpfile)
if self.new_hash and realhash != self.new_hash:
os.remove(_tmpfile)
raise RuntimeError('Hash mismatch: {0} (LOCAL), {1} (REMOTE)'.format(realhash, self.new_hash))
os.makedirs(os.path.dirname(self.dest_iso), exist_ok = True)
pathlib.Path(self.dest_iso).touch(mode = 0o0644, exist_ok = True)
shutil.copyfile(_tmpfile, self.dest_iso)
os.remove(_tmpfile)
self.updateVer()
return(None)
def getCurVer(self):
raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods '
'should be replaced.')
def getISOHash(self, filepath = None):
if not filepath:
filepath = self.dest_iso
else:
filepath = os.path.abspath(os.path.expanduser(filepath))
hasher = hashlib.new(self.hash_type)
# TODO: later on when python 3.8 is more prevalent, https://stackoverflow.com/a/1131238/733214
with open(filepath, 'rb') as fh:
while True:
chunk = fh.read(8192)
if not chunk:
break
hasher.update(chunk)
return(hasher.hexdigest().lower())
def getNewVer(self):
raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods '
'should be replaced.')
def getRunning(self):
if not os.path.isfile(self.lckfile):
return(False)
my_pid = os.getpid()
with open(self.lckfile, 'r') as fh:
pid = int(fh.read().strip())
if not psutil.pid_exists(pid):
os.remove(self.lckfile)
return(False)
if pid == my_pid:
return(False)
return(True)
def getUUID(self):
disk_cmd = subprocess.run(['findmnt',
'-T', '/boot',
'--json'],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
if (disk_cmd.returncode != 0) or disk_cmd.stderr.decode('utf-8').strip() != '':
raise RuntimeError('Could not get disk UUID: {0}'.format(disk_cmd.stderr.decode('utf-8')))
disk_dict = json.loads(disk_cmd.stdout.decode('utf-8'))
disk_dev = disk_dict['filesystems'][0]['source']
info_cmd = subprocess.run(['blkid',
'-o', 'export',
disk_dev],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
if (info_cmd.returncode != 0) or info_cmd.stderr.decode('utf-8').strip() != '':
raise RuntimeError('Could not get disk UUID: {0}'.format(info_cmd.stderr.decode('utf-8')))
info_dict = {i.split('=', 1)[0].lower():i.split('=', 1)[1]
for i in info_cmd.stdout.decode('utf-8').splitlines()}
self.boot_uuid = info_dict.get('uuid')
return(None)
def grub(self):
import jinja2
loader = jinja2.FileSystemLoader(searchpath = self._tpl_dir)
tplenv = jinja2.Environment(loader = loader)
tpl = tplenv.get_template(self._tpl_file)
with open(self.grub_cfg, 'w') as fh:
fh.write(tpl.render(**self._tpl_vars))
os.chmod(self.grub_cfg, 0o0755)
cmd = subprocess.run(['grub-mkconfig',
'-o', '{0}/grub/grub.cfg'.format(self.boot_dir)],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
if cmd.returncode != 0:
stderr = cmd.stderr.decode('utf-8')
if stderr.strip() != '':
print(stderr)
raise RuntimeError('grub-mkconfig returned a non-zero exit status ({0})'.format(cmd.returncode))
return(None)
def lock(self):
with open(self.lckfile, 'w') as fh:
fh.write(str(os.getpid()))
return(None)
def touchVer(self):
if self.getRunning():
return(None)
ver_path = pathlib.Path(self.dest_ver)
ver_path.touch(exist_ok = True)
return(None)
def unlock(self):
if os.path.isfile(self.lckfile):
os.remove(self.lckfile)
return(None)
def updateVer(self):
raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods '
'should be replaced.')