bdisk/bdisk/bGPG.py

229 lines
8.9 KiB
Python
Executable File

import os
from io import BytesIO
import subprocess
import datetime
import jinja2
import gpgme
import psutil
def genGPG(conf):
# https://media.readthedocs.org/pdf/pygpgme/latest/pygpgme.pdf
build = conf['build']
dlpath = build['dlpath']
bdisk = conf['bdisk']
gpghome = conf['gpg']['mygpghome']
distkeys = []
gpgkeyserver = []
for a in conf['build']['arch']:
keysrv = conf['src'][a]['gpgkeyserver']
distkey = conf['src'][a]['gpgkey']
if keysrv and (keysrv not in gpgkeyserver):
gpgkeyserver.append(keysrv)
if distkey and(distkey not in distkeys):
distkeys.append(distkey)
templates_dir = '{0}/extra/templates'.format(build['basedir'])
mykey = False
pkeys = []
killStaleAgent(conf)
if conf['gpg']['mygpgkey'] != '':
mykey = conf['gpg']['mygpgkey']
if gpghome == '':
# Let's try the default.
gpghome = '{0}/.gnupg'.format(os.path.expanduser("~"))
else:
# No key ID was specified.
if gpghome == '':
# We'll generate a key if we can't find one here.
gpghome = build['dlpath'] + '/.gnupg'
killStaleAgent(conf)
os.environ['GNUPGHOME'] = gpghome
gpg = gpgme.Context()
# do we need to add a keyserver?
if len(gpgkeyserver) != 0:
dirmgr = '{0}/dirmngr.conf'.format(gpghome)
for s in gpgkeyserver:
if os.path.isfile(dirmgr):
with open(dirmgr, 'r+') as f:
findme = any(s in line for line in f)
if not findme:
f.seek(0, os.SEEK_END)
f.write("\n# Added by {0}.\nkeyserver {1}\n".format(
bdisk['pname'],
s))
if mykey:
try:
pkeys.append(gpg.get_key(mykey, True))
except:
exit('{0}: ERROR: You specified using {1} but we have no secret key for that ID!'.format(
datetime.datetime.now(),
mykey))
else:
for key in gpg.keylist(None, True):
if key.can_sign:
pkeys.append(key)
break
if len(pkeys) == 0:
print("{0}: [GPG] Generating a GPG key...".format(datetime.datetime.now()))
loader = jinja2.FileSystemLoader(templates_dir)
env = jinja2.Environment(loader = loader)
tpl = env.get_template('GPG.j2')
tpl_out = tpl.render(build = build, bdisk = bdisk)
privkey = gpg.get_key(gpg.genkey(tpl_out).fpr, True)
pkeys.append(privkey)
# do we need to add a keyserver? this is for the freshly-generated GNUPGHOME
if len(gpgkeyserver) != 0:
dirmgr = '{0}/dirmngr.conf'.format(gpghome)
for s in gpgkeyserver:
with open(dirmgr, 'r+') as f:
findme = any(s in line for line in f)
if not findme:
f.seek(0, os.SEEK_END)
f.write("\n# Added by {0}.\nkeyserver {1}\n".format(
bdisk['pname'],
s))
gpg.signers = pkeys
# Now we try to find and add the key for the base image.
gpg.keylist_mode = gpgme.KEYLIST_MODE_EXTERN # remote (keyserver)
if len(distkeys) > 0: # testing
for k in distkeys:
key = gpg.get_key(k)
importkey = key.subkeys[0].fpr
gpg.keylist_mode = gpgme.KEYLIST_MODE_LOCAL # local keyring (default)
DEVNULL = open(os.devnull, 'w')
print('{0}: [GPG] Importing {1} and signing it for verification purposes...'.format(
datetime.datetime.now(),
distkey))
cmd = ['/usr/bin/gpg',
'--recv-keys',
'--batch',
'--yes',
'0x{0}'.format(importkey)]
subprocess.call(cmd, stdout = DEVNULL, stderr = subprocess.STDOUT)
sigkeys = []
for i in gpg.get_key(importkey).subkeys:
sigkeys.append(i.fpr)
cmd = ['/usr/bin/gpg',
'--batch',
'--yes',
'--lsign-key',
'0x{0}'.format(importkey)]
subprocess.call(cmd, stdout = DEVNULL, stderr = subprocess.STDOUT)
# We need to expose this key to the chroots, too, so we need to export it.
with open('{0}/gpgkey.pub'.format(dlpath), 'wb') as f:
gpg.export(pkeys[0].subkeys[0].keyid, f)
return(gpg)
def killStaleAgent(conf):
# Kill off any stale GPG agents running.
# Probably not even needed, but good to have.
chrootdir = conf['build']['chrootdir']
gpgpath = conf['gpg']['mygpghome']
procs = psutil.process_iter()
plst = []
for p in procs:
if (p.name() in ('gpg-agent', 'dirmngr') and p.uids()[0] == os.getuid()):
pd = psutil.Process(p.pid).as_dict()
for d in (chrootdir, gpgpath):
if pd['cwd'].startswith('{0}'.format(d)):
plst.append(p.pid)
if len(plst) >= 1:
for p in plst:
psutil.Process(p).terminate()
def signIMG(path, conf):
if conf['build']['sign']:
# Do we want to kill off any stale gpg-agents? (So we spawn a new one)
# Requires further testing.
#killStaleAgent()
gpg = conf['gpgobj']
print('{0}: [GPG] Signing {1}...'.format(
datetime.datetime.now(),
path))
# May not be necessary; further testing necessary
#if os.getenv('GPG_AGENT_INFO'):
# del os.environ['GPG_AGENT_INFO']
gpg = conf['gpgobj']
# ASCII-armor (.asc)
gpg.armor = True
data_in = open(path, 'rb')
sigbuf = BytesIO()
sig = gpg.sign(data_in, sigbuf, gpgme.SIG_MODE_DETACH)
_ = sigbuf.seek(0)
_ = data_in.seek(0)
data_in.close()
with open('{0}.asc'.format(path), 'wb') as f:
f.write(sigbuf.read())
print('{0}: [GPG] Wrote {1}.asc (ASCII-armored signature).'.format(
datetime.datetime.now(),
path))
# Binary signature (.sig)
gpg.armor = False
data_in = open(path, 'rb')
sigbuf = BytesIO()
sig = gpg.sign(data_in, sigbuf, gpgme.SIG_MODE_DETACH)
_ = sigbuf.seek(0)
_ = data_in.seek(0)
data_in.close()
with open('{0}.sig'.format(path), 'wb') as f:
f.write(sigbuf.read())
print('{0}: [GPG] Wrote {1}.sig (binary signature).'.format(
datetime.datetime.now(),
path))
def gpgVerify(sigfile, datafile, conf):
gpg = conf['gpgobj']
fullkeys = []
print('{0}: [GPG] Verifying {1} with {2}...'.format(
datetime.datetime.now(),
datafile,
sigfile))
keylst = gpg.keylist()
for k in keylst:
fullkeys.append(k.subkeys[0].fpr)
with open(sigfile,'rb') as s:
with open(datafile, 'rb') as f:
sig = gpg.verify(s, f, None)
for x in sig:
if x.validity <= 1:
if not x.validity_reason:
reason = 'we require a signature trust of 2 or higher'
else:
reason = x.validity_reason
print('{0}: [GPG] Key {1} failed to verify: {2}'.format(
datetime.datetime.now(),
x.fpr,
reason))
verified = False
skeys = []
for k in sig:
skeys.append(k.fpr)
if k.fpr in fullkeys:
verified = True
break
else:
pass
if verified:
print('{0}: [GPG] {1} verified (success).'.format(
datetime.datetime.now(),
datafile))
else:
print('{0}: [GPG] {1} failed verification!'.format(
datetime.datetime.now(),
datafile))
return(verified)
def delTempKeys(conf):
# Create a config option to delete these.
# It's handy to keep these keys, but I'd understand if
# people didn't want to use them.
gpg = conf['gpgobj']
if conf['gpg']:
keys = []
if conf['gpgkey'] != '':
keys.append(gpg.get_key(conf['gpgkey']))
if conf['mygpghome'] == '':
keys.append(gpg.get_key(None, True)) # this is safe; we generated our own
for k in keys:
gpg.delete(k)
killStaleAgent(conf)