import copy import os import re import shutil import subprocess import tarfile import tempfile import warnings ## import Namcap import requests ## import arb_util # TODO: implement alwaysBuild check!!! # TODO: should this be a configuration option? aurbase = 'https://aur.archlinux.org' # Maps the AUR API attribute names to their pkgbuild equivalents. # Remote all attributes not present here. _attrmap = {'Description': 'desc', 'Depends': 'depends', 'License': 'licenses', 'Name': 'name', 'URL': 'url', 'URLPath': 'pkgurl', 'Version': 'version'} # And this is a blank dict as returned by Namcap.package (with useless values removed) _pkgattrs = {k: None for k in ('base', 'desc', 'install', 'name', 'url', 'version')} _pkgattrs.update({k: [] for k in ('arch', 'backup', 'conflicts', 'depends', 'groups', 'licenses', 'makedepends', 'md5sums', 'names', 'optdepends', 'options', 'orig_depends', 'orig_makedepends', 'orig_optdepends', 'orig_provides', 'provides', 'replaces', 'sha1sums', 'sha224sums', 'sha256sums', 'sha384sums', 'sha512sums', 'source', 'split', 'validgpgkeys')}) # This be custom. _pkgattrs.update({'pkgurl': None}) class Package(object): def __init__(self, pkg_xml, ns = '', gpgobj = None, *args, **kwargs): self.xml = pkg_xml self.always_build = arb_util.xmlBool(pkg_xml.attrib.get('alwaysBuild', True)) self.cleanup = arb_util.xmlBool(pkg_xml.attrib.get('cleanUp', True)) self.name = pkg_xml.text self.gpg = gpgobj self.pkginfo = None self.srcdir = None def build(self, destdir): if not self.srcdir: raise RuntimeError('You must run .extract() before running .build()') prebuild_files = [] postbuild_files = [] for root, dirs, files in os.walk(self.srcdir): for f in files: prebuild_files.append(os.path.join(root, f)) os.chdir(self.srcdir) # customizepkg-scripting in AUR try: custpkg_out = subprocess.run(['/usr/bin/customizepkg', '-m'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) except FileNotFoundError: pass # Not installed build_out = subprocess.run(['/usr/bin/multilib-build', '-c', '--', '--', '--skippgpcheck', '--syncdeps', '--noconfirm', '--log', '--holdver', '--skipinteg'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) for root, dirs, files in os.walk(self.srcdir): for f in files: fpath = os.path.join(root, f) if fpath in prebuild_files: continue if fpath.endswith('.log'): continue postbuild_files.append(fpath) postbuild_files = [i for i in postbuild_files if i.endswith('.pkg.tar.xz')] if not postbuild_files: warnings.warn('Could not reliably find any built packages for {0}; skipping'.format(self.name)) else: for f in postbuild_files: fdest = os.path.join(destdir, os.path.basename(f)) if os.path.isfile(fdest): os.remove(fdest) shutil.move(f, fdest) if self.cleanup: shutil.rmtree(self.srcdir) return([os.path.basename(f) for f in postbuild_files]) def extract(self, dest): # no-op; this is handled in the subclasses since it's unique to them. pass return(True) def getPkgInfo(self): # no-op; this is handled in the subclasses since it's unique to them. pass return(True) class LocalPkg(Package): def __init__(self, pkg_xml, ns = '', *args, **kwargs): super().__init__(pkg_xml, ns = ns, *args, **kwargs) self.source = os.path.abspath(os.path.expanduser(pkg_xml.attrib.get('path', '.'))) def extract(self, dest): self.getPkgInfo() if os.path.isfile(self.source): try: with tarfile.open(name = self.source, mode = 'r|*') as tar: tar.extractall(dest) dest = os.path.join(dest, self.name) except tarfile.ReadError as e: if str(e) != 'invalid header': # TODO: log instead raise (e) # "invalid header" means it isn't a tarball. Contextually, that means a PKGBUILD file. dest = os.path.join(dest, self.name) os.makedirs(dest, exist_ok = True) shutil.copy2(self.source, dest) elif os.path.isdir(self.source): os.makedirs(dest, exist_ok = True) # Already "extracted". shutil.copytree(self.source, dest) pkg = dict(Namcap.package.load_from_pkgbuild(os.path.join(dest, 'PKGBUILD'))) del(pkg['setvars']) self.pkginfo.update(pkg) self.srcdir = dest return(True) def getPkgInfo(self): pkgbuild = None pkgbld_re = re.compile(r'(^|/)PKGBUILD$') is_temp = False if os.path.isfile(self.source): try: with tarfile.open(name = self.source, mode = 'r:*') as tar: for f in tar.getmembers(): if pkgbld_re.search(f.name): pkgbuild = tempfile.mkstemp()[1] with open(pkgbuild, 'wb') as fh: fh.write(tar.extractfile(f).read()) is_temp = True break except tarfile.ReadError as e: if str(e) != 'file could not be opened successfully': # TODO: log instead raise(e) # "file could not be opened successfully" means it isn't a tarball. # Contextually, that means a PKGBUILD file. pkgbuild = self.source elif os.path.isdir(self.source): pkgbuild = os.path.join(self.source, 'PKGBUILD') if not pkgbuild: raise RuntimeError('Could not find a PKGBUILD for {0}'.format(self.name)) pkg = copy.deepcopy(_pkgattrs) pkg.update(dict(Namcap.package.load_from_pkgbuild(pkgbuild))) del(pkg['setvars']) if is_temp: os.remove(pkgbuild) if self.pkginfo and isinstance(self.pkginfo, dict): self.pkginfo.update(pkg) else: self.pkginfo = pkg return() class AURPkg(Package): def __init__(self, pkg_xml, ns = '', *args, **kwargs): super().__init__(pkg_xml, ns = ns, *args, **kwargs) def extract(self, dest): dl_url = None self.getPkgInfo() if self.pkginfo['name'] == self.name: dl_url = os.path.join(aurbase, re.sub(r'^/+', r'', self.pkginfo['pkgurl'])) if not dl_url: # TODO: log instead? warnings.warn('Could not find a download path for {0}; skipping'.format(self.name)) return(False) with requests.get(dl_url, stream = True) as url: try: with tarfile.open(mode = 'r|*', fileobj = io.BytesIO(url.content)) as tar: tar.extractall(dest) dest = os.path.join(dest, self.name) # This *technically* does nothing unless the AUR is *very* broken. except tarfile.ReadError as e: # "invalid header" means it isn't a tarball if str(e) != 'invalid header': # TODO: log instead raise(e) pkg = Namcap.package.load_from_pkgbuild(os.path.join(dest, 'PKGBUILD')) del(pkg['setvars']) self.pkginfo.update(pkg) self.srcdir = dest return(True) def getPkgInfo(self): pkg_srch = requests.get(os.path.join(aurbase, 'rpc'), params = {'v': 5, 'type': 'info', 'arg': self.name}).json() if 'results' not in pkg_srch: raise RuntimeError(('AUR request for {0} was unsuccessful.' 'Check {1} for status').format(self.name, aurbase)) if len(pkg_srch['results']) != 1: # TODO: log instead? warnings.warn('Package {0} not found in the AUR'.format(self.name)) return(False) pkginfo = copy.deepcopy(_pkgattrs) for k, v in pkg_srch['results'][0].items(): if k in _attrmap: pkginfo[_attrmap[k]] = v self.pkginfo = pkginfo return()