adding autopkg

master
brent s 4 years ago
parent 69d13d5c97
commit 1fc59208b6
  1. 1
      .gitignore
  2. 165
      arch/autopkg/maintain.py
  3. 278
      arch/autopkg/run.py
  4. 127
      arch/autopkg/setup.py
  5. 38
      ref/python.tips_tricks_and_dirty_hacks

1
.gitignore vendored

@ -22,5 +22,6 @@ __pycache__/
*.run
*.7z
*.rar
*.sqlite3
*.deb
.idea/

@ -0,0 +1,165 @@
#!/usr/bin/env python

import argparse
import json
import os
import sqlite3
import run
from urllib.request import urlopen

def parseArgs():
args = argparse.ArgumentParser(description = ('Modify (add/remove) packages for use with Autopkg'),
epilog = ('Operation-specific help; try e.g. "add --help"'))
commonargs = argparse.ArgumentParser(add_help = False)
commonargs.add_argument('-n', '--name',
dest = 'pkgnm',
required = True,
help = ('The name of the PACKAGE to operate on.'))
commonargs.add_argument('-d', '--db',
dest = 'dbfile',
default = '~/.optools/autopkg.sqlite3',
help = ('The location of the package database. THIS SHOULD NOT BE ANY FILE USED BY '
'ANYTHING ELSE! A default one will be created if it doesn\'t exist'))
subparsers = args.add_subparsers(help = ('Operation to perform'),
metavar = 'OPERATION',
dest = 'oper')
addargs = subparsers.add_parser('add',
parents = [commonargs],
help = ('Add a package. If a matching package NAME exists (-n/--name), '
'we\'ll replace it'))
addargs.add_argument('-b', '--base',
dest = 'pkgbase',
default = None,
help = ('The pkgbase; only really needed for split-packages and we will automatically '
'fetch if it\'s left blank anyways'))
addargs.add_argument('-v', '--version',
dest = 'pkgver',
default = None,
help = ('The current version; we will automatically fetch it if it\'s left blank'))
addargs.add_argument('-l', '--lock',
dest = 'active',
action = 'store_false',
help = ('If specified, the package will still exist in the DB but it will be marked inactive'))
rmargs = subparsers.add_parser('rm',
parents = [commonargs],
help = ('Remove a package from the DB'))
buildargs = subparsers.add_parser('build',
help = ('Build all packages; same effect as running run.py'))
buildargs.add_argument('-d', '--db',
dest = 'dbfile',
default = '~/.optools/autopkg.sqlite3',
help = ('The location of the package database. THIS SHOULD NOT BE ANY FILE USED BY '
'ANYTHING ELSE! A default one will be created if it doesn\'t exist'))
listargs = subparsers.add_parser('ls',
help = ('List packages (and information about them) only'))
listargs.add_argument('-d', '--db',
dest = 'dbfile',
default = '~/.optools/autopkg.sqlite3',
help = ('The location of the package database. THIS SHOULD NOT BE ANY FILE USED BY '
'ANYTHING ELSE! A default one will be created if it doesn\'t exist'))
return(args)

def add(args):
db = sqlite3.connect(args['dbfile'])
db.row_factory = sqlite3.Row
cur = db.cursor()
if not all((args['pkgbase'], args['pkgver'])):
# We need some additional info from the AUR API...
aur_url = 'https://aur.archlinux.org/rpc/?v=5&type=info&by=name&arg%5B%5D={0}'.format(args['pkgnm'])
with urlopen(aur_url) as url:
aur = json.loads(url.read().decode('utf-8'))['results']
if not aur:
raise ValueError(('Either something is screwy with our network access '
'or the package {0} doesn\'t exist').format(args['pkgnm']))
if ((aur['PackageBase'] != aur['Name']) and (not args['pkgbase'])):
args['pkgbase'] = aur['PackageBase']
if not args['pkgver']:
args['pkgver'] = aur['Version']
cur.execute("SELECT id, pkgname, pkgbase, pkgver, active FROM packages WHERE pkgname = ?",
(args['pkgnm'], ))
row = cur.fetchone()
if row:
if args['pkgbase']:
q = ("UPDATE packages SET pkgbase = ? AND pkgver = ? AND ACTIVE = ? WHERE id = ?",
(args['pkgbase'], args['pkgver'], ('0' if args['lock'] else '1'), row['id']))
else:
q = ("UPDATE packages SET pkgver = ? AND ACTIVE = ? WHERE id = ?",
(args['pkgver'], ('0' if args['lock'] else '1'), row['id']))
else:
if args['pkgbase']:
q = (("INSERT INTO "
"packages (pkgname, pkgbase, pkgver, active) "
"VALUES (?, ?, ?, ?)"),
(args['pkgnm'], args['pkgbase'], args['pkgver'], ('0' if args['lock'] else '1')))
else:
q = (("INSERT INTO "
"packages (pkgname, pkgver, active) "
"VALUES (?, ?, ?)"),
(args['pkgnm'], args['pkgver'], ('0' if args['lock'] else '1')))
cur.execute(*q)
db.commit()
cur.close()
db.close()
return()

def rm(args):
db = sqlite3.connect(args['dbfile'])
cur = db.cursor()
cur.execute("DELETE FROM packages WHERE pkgname = ?",
(args['pkgnm'], ))
db.commit()
cur.close()
db.close()
return()

def build(args):
pm = run.PkgMake(db = args['dbfile'])
pm.main()
return()

def ls(args):
db = sqlite3.connect(args['dbfile'])
db.row_factory = sqlite3.Row
cur = db.cursor()
rows = []
cur.execute("SELECT * FROM packages ORDER BY pkgname")
for r in cur.fetchall():
pkgnm = r['pkgname']
rows.append({'name': r['pkgname'],
'row_id': r['id'],
'pkgbase': ('' if not r['pkgbase'] else r['pkgbase']),
'ver': r['pkgver'],
'enabled': ('Yes' if r['active'] else 'No')})
header = '| NAME | PACKAGE BASE | VERSION | ENABLED | ROW ID |'
sep = '=' * len(header)
fmt = '|{name:<16}|{pkgbase:<16}|{ver:^9}|{enabled:^9}|{row_id:<8}|'
out = []
for row in rows:
out.append(fmt.format(**row))
header = '\n'.join((sep, header, sep))
out.insert(0, header)
out.append(sep)
print('\n'.join(out))
cur.close()
db.close()
return()

def main():
rawargs = parseArgs()
args = vars(rawargs.parse_args())
if not args['oper']:
rawargs.print_help()
exit()
args['dbfile'] = os.path.abspath(os.path.expanduser(args['dbfile']))
if args['oper'] == 'add':
add(args)
elif args['oper'] == 'rm':
rm(args)
elif args['oper'] == 'build':
build(args)
elif args['oper'] == 'ls':
ls(args)
return()

if __name__ == '__main__':
main()

@ -0,0 +1,278 @@
#!/usr/bin/env python

import grp
import json
import os
import pwd
import re
import shutil
import sqlite3
import subprocess
import tarfile
import urllib.request as reqs
import urllib.parse as urlparse
import setup
# I *HATE* relying on non-stlib, and I hate even MORE that this is JUST TO COMPARE VERSION STRINGS.
# WHY IS THIS FUNCTIONALITY NOT STDLIB YET.
try:
from distutils.version import LooseVersion
has_lv = True
except ImportError:
has_lv = False

# The base API URL (https://wiki.archlinux.org/index.php/Aurweb_RPC_interface)
aur_base = 'https://aur.archlinux.org/rpc/?v=5&type=info&by=name'
# The length of the above. Important because of uri_limit.
base_len = len(aur_base)
# Maximum length of the URI.
uri_limit = 4443

class PkgMake(object):
def __init__(self, db = '~/.optools/autopkg.sqlite3'):
db = os.path.abspath(os.path.expanduser(db))
if not os.path.isfile(db):
setup.firstrun(db)
self.conn = sqlite3.connect(db)
self.conn.row_factory = sqlite3.Row
self.cur = self.conn.cursor()
self.cfg = setup.main(self.conn, self.cur)
if self.cfg['sign']:
_cmt_mode = self.conn.isolation_level # autocommit
self.conn.isolation_level = None
self.fpr, self.gpg = setup.GPG(self.cur, homedir = self.cfg['gpg_homedir'], keyid = self.cfg['gpg_keyid'])
self.conn.isolation_level = _cmt_mode
# don't need this anymore; it should be duplicated or populated into self.fpr.
del(self.cfg['gpg_keyid'])
self.my_key = self.gpg.get_key(self.fpr, secret = True)
self.gpg.signers = [self.my_key]
else:
self.fpr = self.gpg = self.my_key = None
del(self.cfg['gpg_keyid'])
self.pkgs = {}
self._populatePkgs()

def main(self):
self.getPkg()
self.buildPkg()
return()

def _chkver(self, pkgbase):
new_ver = self.pkgs[pkgbase]['meta']['new_ver']
old_ver = self.pkgs[pkgbase]['meta']['pkgver']
is_diff = (new_ver != old_ver) # A super-stupid fallback
if is_diff:
if has_lv:
is_diff = LooseVersion(new_ver) > LooseVersion(old_ver)
else:
# like, 90% of the time, this would work.
new_tuple = tuple(map(int, (re.split('\.|-', new_ver))))
old_tuple = tuple(map(int, (re.split('\.|-', old_ver))))
# But people at https://stackoverflow.com/a/11887825/733214 are very angry about it, hence the above.
is_diff = new_tuple > old_tuple
return(is_diff)

def _populatePkgs(self):
# These columns/keys are inferred by structure or unneeded. Applies to both DB and AUR API.
_notrack = ('pkgbase', 'pkgname', 'active', 'id', 'packagebaseid', 'numvotes', 'popularity', 'outofdate',
'maintainer', 'firstsubmitted', 'lastmodified', 'depends', 'optdepends', 'conflicts', 'license',
'keywords')
_attr_map = {'version': 'new_ver'}
# These are tracked per-package; all others are pkgbase and applied to all split pkgs underneath.
_pkg_specific = ('pkgdesc', 'arch', 'url', 'license', 'groups', 'depends', 'optdepends', 'provides',
'conflicts', 'replaces', 'backup', 'options', 'install', 'changelog')
_aur_results = []
_urls = []
_params = {'arg[]': []}
_tmp_params = {'arg[]': []}
self.cur.execute("SELECT * FROM packages WHERE active = '1'")
for row in self.cur.fetchall():
pkgbase = (row['pkgbase'] if row['pkgbase'] else row['pkgname'])
pkgnm = row['pkgname']
if pkgbase not in self.pkgs:
self.pkgs[pkgbase] = {'packages': {pkgnm: {}},
'meta': {}}
for k in dict(row):
if not k:
continue
if k in _notrack:
continue
if k in _pkg_specific:
self.pkgs[pkgbase]['packages'][pkgnm][k] = row[k]
else:
if k not in self.pkgs[pkgbase]['meta']:
self.pkgs[pkgbase]['meta'][k] = row[k]
# TODO: change this?
pkgstr = urlparse.quote(pkgnm) # We perform against a non-pkgbased name for the AUR search.
_tmp_params['arg[]'].append(pkgstr)
l = base_len + (len(urlparse.urlencode(_tmp_params, doseq = True)) + 1)
if l >= uri_limit:
# We need to split into multiple URIs based on URI size because of:
# https://wiki.archlinux.org/index.php/Aurweb_RPC_interface#Limitations
_urls.append('&'.join((aur_base, urlparse.urlencode(_params, doseq = True))))
_params = {'arg[]': []}
_tmp_params = {'arg[]': []}
_params['arg[]'].append(pkgstr)
_urls.append('&'.join((aur_base, urlparse.urlencode(_params, doseq = True))))
for url in _urls:
with reqs.urlopen(url) as u:
_aur_results.extend(json.loads(u.read().decode('utf-8'))['results'])
for pkg in _aur_results:
pkg = {k.lower(): v for (k, v) in pkg.items()}
pkgnm = pkg['name']
pkgbase = pkg['packagebase']
for (k, v) in pkg.items():
if k in _notrack:
continue
if k in _attr_map:
k = _attr_map[k]
if k in _pkg_specific:
self.pkgs[pkgbase]['packages'][pkgnm][k] = v
else:
self.pkgs[pkgbase]['meta'][k] = v
self.pkgs[pkgbase]['meta']['snapshot'] = 'https://aur.archlinux.org{0}'.format(pkg['urlpath'])
self.pkgs[pkgbase]['meta']['filename'] = os.path.basename(pkg['urlpath'])
self.pkgs[pkgbase]['meta']['build'] = self._chkver(pkgbase)
return()

def _drop_privs(self):
# First get the list of groups to assign.
# This *should* generate a list *exactly* like as if that user ran os.getgroups(),
# with the addition of self.cfg['build_user']['gid'] (if it isn't included already).
newgroups = list(sorted([g.gr_gid
for g in grp.getgrall()
if pwd.getpwuid(self.cfg['build_user']['uid'])
in g.gr_mem]))
if self.cfg['build_user']['gid'] not in newgroups:
newgroups.append(self.cfg['build_user']['gid'])
newgroups.sort()
# This is the user's "primary group"
user_gid = pwd.getpwuid(self.cfg['build_user']['uid']).pw_gid
if user_gid not in newgroups:
newgroups.append(user_gid)
os.setgroups(newgroups)
# If we used os.setgid and os.setuid, we would PERMANENTLY/IRREVOCABLY drop privs.
# Being that that doesn't suit the meta of the rest of the script (chmodding, etc.) - probably not a good idea.
os.setresgid(self.cfg['build_user']['gid'], self.cfg['build_user']['gid'], -1)
os.setresuid(self.cfg['build_user']['uid'], self.cfg['build_user']['uid'], -1)
# Default on most linux systems. reasonable enough for building? (equal to chmod 755/644)
os.umask(0o0022)
# TODO: we need a full env construction here, I think, as well. PATH, HOME, GNUPGHOME at the very least?
return()

def _restore_privs(self):
os.setresuid(self.cfg['orig_user']['uid'], self.cfg['orig_user']['uid'], self.cfg['orig_user']['uid'])
os.setresgid(self.cfg['orig_user']['gid'], self.cfg['orig_user']['gid'], self.cfg['orig_user']['gid'])
os.setgroups(self.cfg['orig_user']['groups'])
os.umask(self.cfg['orig_user']['umask'])
# TODO: if we change the env, we need to change it back here. I capture it in self.cfg['orig_user']['env'].
return()

def getPkg(self):
self._drop_privs()
for pkgbase in self.pkgs:
if not self.pkgs[pkgbase]['meta']['build']:
continue
_pkgre = re.compile('^(/?.*/)*({0})/?'.format(pkgbase))
builddir = os.path.join(self.cfg['cache'], pkgbase)
try:
shutil.rmtree(builddir)
except FileNotFoundError:
# We *could* use ignore_errors or onerrors params, but we only want FileNotFoundError.
pass
os.makedirs(builddir, mode = self.cfg['chmod']['dirs'], exist_ok = True)
tarball = os.path.join(builddir, self.pkgs[pkgbase]['meta']['filename'])
with reqs.urlopen(self.pkgs[pkgbase]['meta']['snapshot']) as url:
# We have to write out to disk first because the tarfile module HATES trying to perform seeks on
# a tarfile stream. It HATES it.
with open(tarball, 'wb') as f:
f.write(url.read())
tarnames = {}
with tarfile.open(tarball, mode = 'r:*') as tar:
for i in tar.getmembers():
if any((i.isdir(), i.ischr(), i.isblk(), i.isfifo(), i.isdev())):
continue
if i.name.endswith('.gitignore'):
continue
# We want to strip leading dirs out.
tarnames[i.name] = _pkgre.sub('', i.name)
# Small bugfix.
if tarnames[i.name] == '':
tarnames[i.name] = os.path.basename(i.name)
tarnames[i.name] = os.path.join(builddir, tarnames[i.name])
for i in tar.getmembers():
if i.name in tarnames:
# GOLLY I WISH TARFILE WOULD LET US JUST CHANGE THE ARCNAME DURING EXTRACTION ON THE FLY.
with open(tarnames[i.name], 'wb') as f:
f.write(tar.extractfile(i.name).read())
# No longer needed, so clean it up behind us.
os.remove(tarball)
self._restore_privs()
return()

def buildPkg(self):
self._drop_privs()
for pkgbase in self.pkgs:
if not self.pkgs[pkgbase]['meta']['build']:
continue
builddir = os.path.join(self.cfg['cache'], pkgbase)
os.chdir(builddir)
# subprocess.run(['makepkg']) # TODO: figure out gpg sig checking?
subprocess.run(['makepkg', '--clean', '--force', '--skippgpcheck'])
self._restore_privs()
for pkgbase in self.pkgs:
if not self.pkgs[pkgbase]['meta']['build']:
continue
builddir = os.path.join(self.cfg['cache'], pkgbase)
# The i686 isn't even supported anymore, but let's keep this friendly for Archlinux32 folks.
_pkgre = re.compile(('^({0})-{1}-'
'(x86_64|i686|any)'
'\.pkg\.tar\.xz$').format('|'.join(self.pkgs[pkgbase]['packages'].keys()),
self.pkgs[pkgbase]['meta']['new_ver']))
fname = None
# PROBABLY in the first root dir, and could be done with fnmatch, but...
for root, dirs, files in os.walk(builddir):
for f in files:
if _pkgre.search(f):
fname = os.path.join(root, f)
break
if not fname:
raise RuntimeError('Could not find proper package build filename for {0}'.format(pkgbase))
destfile = os.path.join(self.cfg['dest'], os.path.basename(fname))
os.rename(fname, destfile)
# TODO: HERE IS WHERE WE SIGN THE PACKAGE?
# We also need to update the package info in the DB.
for p in self.pkgs[pkgbase]['packages']:
self.cur.execute("UPDATE packages SET pkgver = ? WHERE pkgname = ?",
(self.pkgs[pkgbase]['meta']['new_ver'], p))
self.cfg['pkgpaths'].append(destfile)
# No longer needed, so we can clear out the build directory.
shutil.rmtree(builddir)
os.chdir(self.cfg['dest'])
dbfile = os.path.join(self.cfg['dest'], 'autopkg.db.tar.gz') # TODO: Custom repo name?
cmd = ['repo-add', '--nocolor', '--delta', dbfile] # -s/--sign?
cmd.extend(self.cfg['pkgpaths'])
subprocess.run(cmd)
for root, dirs, files in os.walk(self.cfg['dest']):
for f in files:
fpath = os.path.join(root, f)
os.chmod(fpath, self.cfg['chmod']['files'])
os.chown(fpath, self.cfg['chown']['uid'], self.cfg['chown']['gid'])
for d in dirs:
dpath = os.path.join(root, d)
os.chmod(dpath, self.cfg['chmod']['dirs'])
os.chown(dpath, self.cfg['chown']['uid'], self.cfg['chown']['gid'])
return()

def close(self):
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
return()

def main():
pm = PkgMake()
pm.main()

if __name__ == '__main__':
main()

@ -0,0 +1,127 @@
#!/usr/bin/env python

import base64
import copy
import gpg
import grp
import json
import lzma
import os
import pwd
import re
from socket import gethostname
import sqlite3

# NOTE: The gpg homedir should be owned by the user *running autopkg*.
# Likely priv-dropping will only work for root.
#

dirs = ('cache', 'dest', 'gpg_homedir')
u_g_pairs = ('chown', 'build_user')
json_vals = ('chmod', )

blank_db = """
/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4H//AxNdACmURZ1gyBn4JmSIjib+MZX9x4eABpe77H+o
CX2bysoKzO/OaDh2QGbNjiU75tmhPrWMvTFue4XOq+6NPls33xRRL8eZoITBdAaLqbwYY2XW/V/X
Gx8vpjcBnpACjVno40FoJ1qWxJlBZ0PI/8gMoBr3Sgdqnf+Bqi+E6dOl66ktJMRr3bdZ5C9vOXAf
42BtRfwJlwN8NItaWtfRYVfXl+40D05dugcxDLY/3uUe9MSgt46Z9+Q9tGjjrUA8kb5K2fqWSlQ2
6KyF3KV1zsJSDLuaRkP42JNsBTgg6mU5rEk/3egdJiLn+7AupvWQ3YlKkeALZvgEKy75wdObf6QI
jY4qjXjxOTwOG4oou7lNZ3fPI5qLCQL48M8ZbOQoTAQCuArdYqJmBwT2rF86SdQRP4EY6TlExa4o
+E+v26hKhYXO7o188jlmGFbuzqtoyMB1y3UG+Hi2SjPDilD5o6f9fEjiHZm2FY6rkPb9Km4UFlH1
d2A4Wt4iGlciZBs0lFRPKkgHR4s7KHTMKuZyC08qE1B7FwvyBTBBYveA2UoZlKY7d22IbiiSQ3tP
JKhj8nf8EWcgHPt46Juo80l7vqqn6AviY7b1JZXICdiJMbuWJEyzTLWuk4qlUBfimP7k9IjhDFpJ
gEXdNgrnx/wr5CIbr1T5lI9vZz35EacgNA2bGxLA8VI0W9eYDts3BSfhiJOHWwLQPiNzJwd4aeM1
IhqgTEpk+BD0nIgSB3AAB+NfJJavoQjpv0QBA6dH52utA5Nw5L//Ufw/YKaA7ui8YQyDJ7y2n9L3
ugn6VJFFrYSgIe1oRkJBGRGuBgGNTS3aJmdFqEz1vjZBMkFdF+rryXzub4dst2Qh01E6/elowIUh
2whMRVDO28QjyS9tLtLLzfTmBk2NSxs4+znE0ePKKw3n/p6YlbPRAw24QR8MTCOpQ2lH1UZNWBM2
epxfmWtgO5b/wGYopRDEvDDdbPAq6+4zxTOT5RmdWZyc46gdizf9+dQW3wZ9iBDjh4MtuYPvLlqr
0GRmsyrxgFxkwvVoXASNndS0NPcAADkAhYCxn+W2AAGvBoCAAgB/TQWascRn+wIAAAAABFla
"""

def firstrun(dbfile):
dbdata = lzma.decompress(base64.b64decode(blank_db))
with open(dbfile, 'wb') as f:
f.write(dbdata)
return()

def main(connection, cursor):
cfg = {'orig_cwd': os.getcwd(),
'pkgpaths': []}
cursor.execute("SELECT directive, value FROM config")
for r in cursor.fetchall():
cfg[r['directive']] = r['value'].strip()
for k in cfg:
for x in (True, False, None):
if cfg[k] == str(x):
cfg[k] = x
break
if k in json_vals:
cfg[k] = json.loads(cfg[k])
if k == 'path':
paths = []
for i in cfg[k].split(':'):
p = os.path.abspath(os.path.expanduser(i))
paths.append(p)
cfg[k] = paths
if k in dirs:
if cfg[k]:
cfg[k] = os.path.abspath(os.path.expanduser(cfg[k]))
os.makedirs(cfg[k], exist_ok = True)
if k in u_g_pairs:
dflt = [pwd.getpwuid(os.geteuid()).pw_name, grp.getgrgid(os.getegid()).gr_name]
l = re.split(':|\.', cfg[k])
if len(l) == 1:
l.append(None)
for idx, i in enumerate(l[:]):
if i in ('', None):
l[idx] = dflt[idx]
cfg[k] = {}
cfg[k]['uid'] = (int(l[0]) if l[0].isnumeric() else pwd.getpwnam(l[0]).pw_uid)
cfg[k]['gid'] = (int(l[1]) if l[1].isnumeric() else grp.getgrnam(l[1]).gr_gid)
cfg['orig_user'] = {'uid': os.geteuid(),
'gid': os.getegid()}
# Ugh. https://orkus.wordpress.com/2011/04/17/python-getting-umask-without-change/
cfg['orig_user']['umask'] = os.umask(0)
os.umask(cfg['orig_user']['umask'])
cfg['orig_user']['groups'] = os.getgroups()
for i in cfg['chmod']:
cfg['chmod'][i] = int(cfg['chmod'][i], 8)
cfg['orig_user']['env'] = copy.deepcopy(dict(os.environ))
os.chown(cfg['cache'], uid = cfg['build_user']['uid'], gid = cfg['build_user']['gid'])
os.chown(cfg['dest'], uid = cfg['chown']['uid'], gid = cfg['chown']['gid'])
return(cfg)

def GPG(cur, homedir = None, keyid = None):
g = gpg.Context(home_dir = homedir)
if not keyid:
# We don't have a key specified, so we need to generate one and update the config.
s = ('This signature and signing key were automatically generated using Autopkg from OpTools: '
'https://git.square-r00t.net/OpTools/')
g.sig_notation_add('automatically-generated@git.square-r00t.net', s, gpg.constants.sig.notation.HUMAN_READABLE)
userid = 'Autopkg Signing Key ({0}@{1})'.format(os.getenv('SUDO_USER', os.environ['USER']), gethostname())
params = {
#'algorithm': 'ed25519',
'algorithm': 'rsa4096',
'expires': False,
'expires_in': 0,
'sign': True,
'passphrase': None
}
keyid = g.create_key(userid, **params).fpr
# https://stackoverflow.com/a/50718957
q = {}
for col in ('keyid', 'homedir'):
if sqlite3.sqlite_version_info > (3, 24, 0):
q[col] = ("INSERT INTO config (directive, value) "
"VALUES ('gpg_{0}', ?) "
"ON CONFLICT (directive) "
"DO UPDATE SET value = excluded.value").format(col)
else:
cur.execute("SELECT id FROM config WHERE directive = 'gpg_{0}'".format(col))
row = cur.fetchone()
if row:
q[col] = ("UPDATE config SET value = ? WHERE id = '{0}'").format(row['id'])
else:
q[col] = ("INSERT INTO config (directive, value) VALUES ('gpg_{0}', ?)").format(col)
cur.execute(q[col], (locals()[col], ))
return(keyid, g)

@ -132,3 +132,41 @@ ip.release()
----

###############################################################################


dropping privileges to non-root user (and restoring):
https://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
https://stackoverflow.com/questions/15705439/drop-root-privileges-for-certain-operations-in-python

NOTE: if you want to *remove the ability* to restore back to root privs, use os.setgid(running_gid) and os.setuid(running_uid) instead.

reference:
http://timetobleed.com/5-things-you-dont-know-about-user-ids-that-will-destroy-you/
https://nanopdf.com/download/setuid-demystified_pdf

----
import os, pwd, grp
import subprocess

my_uid = pwd.getpwuid(os.geteuid()).pw_uid
my_gid = grp.getgrgid(os.getegid()).gr_gid
my_grps = os.getgroups()
try:
os.remove('/tmp/usertest')
os.remove('/tmp/parenttest')
except:
pass
if os.getuid() != 0:
exit('need root')
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
os.setgroups([])
os.setegid(running_gid)
os.seteuid(running_uid)
old_umask = os.umask(0o022)
subprocess.run(['touch', '/tmp/usertest'])
os.seteuid(my_uid)
os.setegid(my_gid)
os.setgroups(my_grps)
os.umask(old_umask)
subprocess.run(['touch', '/tmp/parenttest'])

Loading…
Cancel
Save