import copy import io import logging import os import shutil import tempfile ## import gpg import gpg.errors _logger = logging.getLogger(__name__) class KeyEditor(object): def __init__(self): self.trusted = False _logger.info('Key editor instantiated.') def truster(self, kw, arg, *args, **kwargs): _logger.debug('Key trust editor invoked:') _logger.debug('Command: {0}'.format(kw)) _logger.debug('Argument: {0}'.format(arg)) if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) if kw == 'GET_LINE': if arg == 'keyedit.prompt': if not self.trusted: _logger.debug('Returning: "trust"') return('trust') else: _logger.debug('Returning: "save"') return('save') elif arg == 'edit_ownertrust.value' and not self.trusted: self.trusted = True _logger.debug('Status changed to trusted') _logger.debug('Returning: "4"') return('4') # "Full" else: _logger.debug('Returning: "save"') return('save') return(None) class GPG(object): def __init__(self, homedir = None, primary_key = None, *args, **kwargs): self.homedir = homedir self.primary_key = primary_key self.temporary = None self.ctx = None self._imported_keys = [] _logger.debug('Homedir: {0}'.format(self.homedir)) _logger.debug('Primary key: {0}'.format(self.primary_key)) if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) _logger.info('Instantiated GPG class.') self._initContext() def _initContext(self): if not self.homedir: self.homedir = tempfile.mkdtemp(prefix = '.aif.', suffix = '.gpg') self.temporary = True _logger.debug('Set as temporary homedir.') self.homedir = os.path.abspath(os.path.expanduser(self.homedir)) _logger.debug('Homedir finalized: {0}'.format(self.homedir)) if not os.path.isdir(self.homedir): os.makedirs(self.homedir, exist_ok = True) os.chmod(self.homedir, 0o0700) _logger.info('Created {0}'.format(self.homedir)) self.ctx = gpg.Context(home_dir = self.homedir) if self.temporary: self.primary_key = self.createKey('AIF-NG File Verification Key', sign = True, force = True, certify = True).fpr self.primary_key = self.findKeyByID(self.primary_key, source = 'secret') if self.primary_key: _logger.debug('Found primary key in secret keyring: {0}'.format(self.primary_key.fpr)) else: _logger.error('Could not find primary key in secret keyring: {0}'.format(self.primary_key)) raise RuntimeError('Primary key not found in secret keyring') self.ctx.signers = [self.primary_key] if self.ctx.signers: _logger.debug('Signers set to: {0}'.format(','.join([k.fpr for k in self.ctx.signers]))) else: raise _logger.error('Could not assign signing keys; signing set empty') return(None) def clean(self): # This is mostly just to cleanup the stuff we did before. _logger.info('Cleaning GPG homedir.') self.primary_key = self.primary_key.fpr if self.temporary: self.primary_key = None shutil.rmtree(self.homedir) _logger.info('Deleted temporary GPG homedir: {0}'.format(self.homedir)) self.ctx = None return(None) def createKey(self, userid, *args, **kwargs): # algorithm=None, expires_in=0, expires=True, sign=False, encrypt=False, certify=False, # authenticate=False, passphrase=None, force=False keyinfo = {'userid': userid, 'algorithm': kwargs.get('algorithm', 'rsa4096'), 'expires_in': kwargs.get('expires_in'), 'sign': kwargs.get('sign', True), 'encrypt': kwargs.get('encrypt', False), 'certify': kwargs.get('certify', False), 'authenticate': kwargs.get('authenticate', False), 'passphrase': kwargs.get('passphrase'), 'force': kwargs.get('force')} _logger.debug('Key creation parameters: {0}'.format(keyinfo)) if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) if not keyinfo['expires_in']: del(keyinfo['expires_in']) keyinfo['expires'] = False k = self.ctx.create_key(**keyinfo) _logger.info('Created key: {0}'.format(k.fpr)) _logger.debug('Key info: {0}'.format(k)) return(k) def findKey(self, searchstr, secret = False, local = True, remote = True, secret_only = False, keyring_import = False, *args, **kwargs): fltr = 0 if secret: fltr = fltr | gpg.constants.KEYLIST_MODE_WITH_SECRET _logger.debug('Added "secret" to filter; new filter value: {0}'.format(fltr)) if local: fltr = fltr | gpg.constants.KEYLIST_MODE_LOCAL _logger.debug('Added "local" to filter; new filter value: {0}'.format(fltr)) if remote: fltr = fltr | gpg.constants.KEYLIST_MODE_EXTERN _logger.debug('Added "remote" to filter; new filter value: {0}'.format(fltr)) if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) keys = [k for k in self.ctx.keylist(pattern = searchstr, secret = secret_only, mode = fltr)] _logger.info('Found {0} keys'.format(len(keys))) if keys: _logger.debug('Found keys: {0}'.format(keys)) else: _logger.warn('Found no keys.') if keyring_import: _logger.debug('Importing enabled; importing found keys.') self.importKeys(keys, native = True) return(keys) def findKeyByID(self, key_id, source = 'remote', keyring_import = False, *args, **kwargs): # So .get_key() CAN get a remote key from a keyserver... but you can't have ANY other keylist modes defined. # Ugh. sources = {'remote': gpg.constants.KEYLIST_MODE_EXTERN, 'local': gpg.constants.KEYLIST_MODE_LOCAL, 'secret': gpg.constants.KEYLIST_MODE_WITH_SECRET} if source not in sources.keys(): _logger.error('Invalid source parameter ({0}); must be one of: {1}'.format(source, sources.keys())) raise ValueError('Invalid source parameter') if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) orig_mode = self.ctx.get_keylist_mode() _logger.debug('Original keylist mode: {0}'.format(orig_mode)) self.ctx.set_keylist_mode(sources[source]) _logger.info('Set keylist mode: {0} ({1})'.format(source, sources[source])) _logger.debug('Searching for key ID: {0}'.format(key_id)) try: key = self.ctx.get_key(key_id, secret = (True if source == 'secret' else False)) _logger.info('Found key object for {0}'.format(key_id)) _logger.debug('Found key: {0}'.format(key)) except gpg.errors.KeyNotFound: key = None _logger.warning('Found no keys.') self.ctx.set_keylist_mode(orig_mode) _logger.info('Restored keylist mode ({0})'.format(orig_mode)) if keyring_import and key: _logger.debug('Importing enabled; importing found keys.') self.importKeys(key, native = True) return(key) def getKey(self, key_id, secret = False, strict = False, *args, **kwargs): key = None if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) try: getattr(key_id, 'fpr') _logger.info('Key specified is already a native key object.') _logger.debug('Key: {0}'.format(key_id)) return(key_id) except AttributeError: if not strict: _logger.debug('Strict mode disabled; attempting import of {0} first.'.format(key_id)) self.findKeyByID(key_id, keyring_import = True, **kwargs) try: key = self.ctx.get_key(key_id, secret = secret) _logger.info('Found {0}.'.format(key_id)) _logger.debug('Key: {0}'.format(key)) except gpg.errors.KeyNotFound: _logger.warning('Could not locate {0} in keyring'.format(key_id)) return(key) def getKeyData(self, keydata, keyring_import = False, *args, **kwargs): orig_keydata = keydata if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) if isinstance(keydata, str): _logger.debug('String passed as keydata; converting to bytes.') keydata = keydata.encode('utf-8') buf = io.BytesIO(keydata) _logger.info('Parsed {0} bytes; looking for key(s).'.format(buf.getbuffer().nbytes)) keys = [k for k in self.ctx.keylist(source = buf)] _logger.info('Found {0} key(s) in data.'.format(len(keys))) if keys: _logger.debug('Keys found: {0}'.format(keys)) else: _logger.warning('No keys found in data.') buf.close() if keyring_import: _logger.debug('Importing enabled; importing found keys.') self.importKeys(keys, native = True) return((keys, orig_keydata)) def getKeyFile(self, keyfile, keyring_import = False, *args, **kwargs): if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) orig_keyfile = keyfile keyfile = os.path.abspath(os.path.expanduser(keyfile)) _logger.info('Parsed absolute keyfile path: {0} => {1}'.format(orig_keyfile, keyfile)) with open(keyfile, 'rb') as fh: rawkey_data = fh.read() fh.seek(0, 0) _logger.debug('Parsed {0} bytes; looking for key(s).'.format(len(rawkey_data))) keys = [k for k in self.ctx.keylist(source = fh)] _logger.info('Found {0} key(s) in data.'.format(len(keys))) if keys: _logger.debug('Keys found: {0}'.format(keys)) else: _logger.warning('No keys found in data.') if keyring_import: _logger.debug('Importing enabled; importing found keys.') self.importKeys(keys, native = True) return((keys, rawkey_data)) def importKeys(self, keydata, native = False, local = True, remote = True, *args, **kwargs): fltr = 0 orig_km = None keys = [] if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) if local: fltr = fltr | gpg.constants.KEYLIST_MODE_LOCAL _logger.debug('Added "local" to filter; new filter value: {0}'.format(fltr)) if remote: fltr = fltr | gpg.constants.KEYLIST_MODE_EXTERN _logger.debug('Added "remote" to filter; new filter value: {0}'.format(fltr)) if self.ctx.get_keylist_mode() != fltr: orig_km = self.ctx.get_keylist_mode() self.ctx.set_keylist_mode(fltr) _logger.info(('Current keylist mode ({0}) doesn\'t match filter ({1}); ' 'set to new mode.').format(orig_km, fltr)) if not native: # It's raw key data (.gpg, .asc, etc.). _logger.info('Non-native keydata specified; parsing.') formatted_keys = b'' if isinstance(keydata, str): formatted_keys += keydata.encode('utf-8') _logger.debug('Specified keydata was a string; converted to bytes.') elif isinstance(keydata, list): _logger.debug('Specified keydata was a list/list-like; iterating.') for idx, k in enumerate(keydata): _logger.debug('Parsing entry {0} of {1} entries.'.format((idx + 1), len(keydata))) if isinstance(k, str): formatted_keys += k.encode('utf-8') _logger.debug('Keydata ({0}) was a string; converted to bytes.'.format((idx + 1))) else: _logger.debug('Keydata ({0}) was already in bytes.'.format((idx + 1))) formatted_keys += k else: _logger.warning('Could not identify keydata reliably; unpredictable results ahead.') formatted_keys = keydata rslt = self.ctx.key_import(formatted_keys).imports _logger.debug('Imported keys: {0}'.format(rslt)) for r in rslt: k = self.ctx.get_key(r.fpr) if k: _logger.debug('Adding key to keylist: {0}'.format(k)) else: _logger.warning('Could not find key ID {0}.'.format(r.fpr)) keys.append(k) else: # It's a native Key() object (or a list of them). _logger.info('Native keydata specified; parsing.') if not isinstance(keydata, list): _logger.debug('Specified keydata was not a list/list-like; fixing.') keydata = [keydata] keys = keydata _logger.debug('Importing keys: {0}'.format(keys)) self.ctx.op_import_keys(keydata) if orig_km: self.ctx.set_keylist_mode(orig_km) _logger.info('Restored keylist mode to {0}'.format(orig_km)) for k in keys: _logger.info('Signing {0} with a local signature.'.format(k.fpr)) self.ctx.key_sign(k, local = True) _logger.debug('Adding trust for {0}.'.format(k.fpr)) trusteditor = KeyEditor() self.ctx.interact(k, trusteditor.truster) return(None) def verifyData(self, data, keys = None, strict = False, detached = None, *args, **kwargs): results = {} if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) if keys: _logger.info('Keys were specified.') if not isinstance(keys, list): keys = [self.getKey(keys, source = 'local')] else: keys = [self.getKey(k, source = 'local') for k in keys] _logger.debug('Verifying against keys: {0}'.format(keys)) if isinstance(data, str): data = data.encode('utf-8') _logger.debug('Specified data was a string; converted to bytes.') _logger.info('Verifying {0} bytes of data.'.format(len(data))) fnargs = {'signed_data': data} if detached: _logger.info('Specified a detached signature.') if isinstance(detached, str): detached = detached.encode('utf-8') _logger.debug('Specified signature was a string; converted to bytes.') if not isinstance(detached, bytes) and not hasattr(detached, 'read'): _logger.error('Detached signature was neither bytes nor a buffer-like object.') raise TypeError('detached must be bytes or buffer-like object') if isinstance(detached, bytes): _logger.info('Signature length: {0} bytes'.format(len(detached))) else: _logger.info('Signature length: {0} bytes'.format(detached.getbuffer().nbytes)) fnargs['signature'] = detached if strict: _logger.debug('Strict mode enabled; data must be signed by ALL specified keys.') fnargs['verify'] = keys _logger.debug('Verifying with args: {0}'.format(fnargs)) results[None] = self.ctx.verify(**fnargs) else: if keys: _logger.debug('Keys were specified but running in non-strict; iterating over all.') for k in keys: _fnargs = copy.deepcopy(fnargs) _fnargs['verify'] = [k] _logger.info('Verifying against key {0}'.format(k.fpr)) try: _logger.debug(('Verifying with args (data-stripped): ' '{0}').format({k: (v if k not in ('signed_data', 'signature') else '(stripped)') for k, v in _fnargs.items()})) sigchk = self.ctx.verify(**_fnargs) _logger.info('Key {0} verification results: {1}'.format(k.fpr, sigchk)) results[k.fpr] = (True, sigchk[1], None) except gpg.errors.MissingSignatures as e: _logger.warning('Key {0}: missing signature'.format(k.fpr)) _logger.debug('Key {0} results: {1}'.format(k.fpr, e.results)) results[k.fpr] = (False, e.results, 'Missing Signature') except gpg.errors.BadSignatures as e: _logger.warning('Key {0}: bad signature'.format(k.fpr)) _logger.debug('Key {0} results: {1}'.format(k.fpr, e.results)) results[k.fpr] = (False, e.results, 'Bad Signature') else: _logger.debug('No keys specified but running in non-strict; accepting any signatures.') _logger.debug(('Verifying with args (data-stripped): ' '{0}').format({k: (v if k not in ('signed_data', 'signature') else '(stripped)') for k, v in fnargs.items()})) results[None] = self.ctx.verify(**fnargs) _logger.debug('Results for any/all signatures: {0}'.format(results[None])) return(results) def verifyFile(self, filepath, *args, **kwargs): orig_filepath = filepath filepath = os.path.abspath(os.path.expanduser(filepath)) _logger.debug('File verification invoked. Transformed filepath: {0} => {1}'.format(orig_filepath, filepath)) if args: _logger.debug('args: {0}'.format(','.join(args))) if kwargs: _logger.debug('kwargs: {0}'.format(kwargs)) with open(filepath, 'rb') as fh: results = self.verifyData(fh.read(), **kwargs) return(results)