#!/usr/bin/env python3 # Example .grml.json: # { # "date": "Thu, 21 Jan 2021 06:42:21 +0000", # "arch": "64", # "ver": 2020.06, # "sha512": "b4d2e517c69224bf14f79e155(...)" # } import datetime import json import os import re ## import requests from bs4 import BeautifulSoup ## import _base try: import lxml _has_lxml = True except ImportError: _has_lxml = False class Updater(_base.BaseUpdater): _fname_re = re.compile(r'^(?:.*/)?grml(?P(32|64|96))-' r'(?P(small|full))_' r'(?P[0-9]{4}\.[0-9]{2}).iso$') _def_hash = 'md5' _allowed_hashes = ('md5', 'sha1', 'sha256', 'sha512') _allowed_arches = ('32', '64', '96') # 32-bit, 64-bit, both in one ISO _allowed_variants = ('small', 'full') _tpl_file = 'grml_grub.conf.j2' _datever_fmt = '%Y.%m' def __init__(self, arch = '64', variant = 'full', dest_dir = '/boot/iso', # Should be subdir of boot_dir dest_file = 'grml.iso', ver_file = '.grml.json', lock_path = '/tmp/.grml.lck', dl_base = 'https://download.grml.org/', do_grub_cfg = True, boot_dir = '/boot', # ESP or boot partition mount; where GRUB files are installed *under* grub_cfg = '/etc/grub.d/40_custom_grml', # check_gpg = True, # TODO: GPG sig checking, http://mirror.rit.edu/grml//gnupg-michael-prokop.txt hash_type = 'sha512'): super().__init__(dest_dir, dest_file, ver_file, lock_path, do_grub_cfg, boot_dir, grub_cfg, hash_type) if arch.lower() not in self._allowed_arches: raise ValueError('arch must be one of: {0}'.format(', '.join(self._allowed_arches))) else: self.arch = arch.lower() if hash_type.lower() not in self._allowed_hashes: raise ValueError('hash_type must be one of: {0}'.format(', '.join(self._allowed_hashes))) else: self.hash_type = hash_type.lower() if variant not in self._allowed_variants: raise ValueError('variant must be one of: {0}'.format(', '.join(self._allowed_variants))) else: self.variant = variant.lower() self.dl_base = dl_base self._init_vars() def _init_vars(self): if self.getRunning(): return(None) self.getCurVer() self.getNewVer() return(None) def getCurVer(self): if self.getRunning(): return(None) if not os.path.isfile(self.dest_ver): self.do_update = True self.force_update = True self.old_ver = 0.00 return(None) with open(self.dest_ver, 'rb') as fh: ver_info = json.load(fh) self.old_date = datetime.datetime.strptime(ver_info['date'], self._date_fmt) self.old_ver = ver_info['ver'] self.old_hash = ver_info.get(self.hash_type, self._def_hash) self.variant = ver_info.get('variant', self.variant) self.new_hash = self.old_hash self.new_ver = self.old_ver self.new_date = self.old_date if ver_info.get('arch') != self.arch: self.do_update = True self.force_update = True return(None) if not os.path.isfile(self.dest_iso): self.do_update = True self.force_update = True return(None) realhash = self.getISOHash() if self.old_hash != realhash: self.do_update = True self.force_update = True return(None) return(None) def getNewVer(self): # We could do this a lot more reliably if the mirrors all had the same HTML. if self.getRunning(): return(None) req = requests.get(self.dl_base, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, self.dl_base)) html = BeautifulSoup(req.content.decode('utf-8'), ('lxml' if _has_lxml else 'html.parser')) # versions = dict of versions[ver]: (hash_url, iso_url) versions = {} for link in html.find_all('a'): fname_r = self._fname_re.search(link['href']) if not fname_r: continue ver_info = fname_r.groupdict() if ver_info['arch'] != self.arch: continue if ver_info['variant'] != self.variant: continue new_ver = float(ver_info.get('version', self.old_ver)) iso_url = os.path.join(self.dl_base, link['href'].replace(self.dl_base, '')) hash_url = '{0}.{1}'.format(iso_url, self.hash_type) newver_info = (hash_url, iso_url) versions[new_ver] = newver_info self.new_ver = sorted(list(versions.keys()))[-1] if not all((self.old_ver, self.old_date)) or \ (self.new_ver > self.old_ver): self.do_update = True self.new_date = datetime.datetime.now(datetime.timezone.utc) hash_url, self.iso_url = versions[self.new_ver] req = requests.get(hash_url, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, hash_url)) self.new_hash = req.content.decode('utf-8').lower().split()[0] return(None) def updateVer(self): if self.getRunning(): return(None) d = {'date': self.new_date.strftime(self._date_fmt), 'arch': self.arch, 'ver': self.new_ver, 'variant': self.variant, self.hash_type: self.new_hash} j = json.dumps(d, indent = 4) with open(self.dest_ver, 'w') as fh: fh.write(j) fh.write('\n') os.chmod(self.dest_ver, 0o0644) return(None) if __name__ == '__main__': u = Updater() u.main()