From b2ba35504d76a91ea4ec1f79de7c05d63b1c8b6a Mon Sep 17 00:00:00 2001 From: brent s Date: Tue, 19 Sep 2017 05:09:58 -0400 Subject: [PATCH] and replacing. --- gpg/kant/kant.new.py | 943 --------------------------------- gpg/kant/kant.py | 1201 +++++++++++++++++++++++++++--------------- 2 files changed, 775 insertions(+), 1369 deletions(-) delete mode 100755 gpg/kant/kant.new.py diff --git a/gpg/kant/kant.new.py b/gpg/kant/kant.new.py deleted file mode 100755 index 26ffa72..0000000 --- a/gpg/kant/kant.new.py +++ /dev/null @@ -1,943 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import base64 -import csv -import datetime -import json -import lzma -import operator -import os -import re -import shutil -import smtplib -import subprocess -from email.message import Message -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from functools import reduce -from io import BytesIO -from socket import * -import urllib.parse -import jinja2 # non-stdlib; Arch package is python-jinja2 -import gpg # non-stdlib; Arch package is "python-gpgme" - see: -import gpg.constants # https://git.archlinux.org/svntogit/packages.git/tree/trunk/PKGBUILD?h=packages/gpgme and -import gpg.errors # https://gnupg.org/ftp/gcrypt/gpgme/ (incl. python bindings in build) -import pprint # development debug - - -class SigSession(object): # see docs/REFS.funcs.struct.txt - def __init__(self, args): - # These are the "stock" templates for emails. It's a PITA, but to save some space since we store them - # inline in here, they're XZ'd and base64'd. - self.email_tpl = {} - self.email_tpl['plain'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AWYAs1dACQZSZhvFgKNdKNXbSf05z0ZPvTvmdQ0mJQg' + - 'atgzhPVeLKxz22bhxedC813X5I8Gn2g9q9Do2jPPViyuLf1SFHDUx1m7SEFsSUyT7L/j71tuxRZi' + - 'xLyy6P3mHo3HeUhwgpgh6lvMYwlTf+zhj3558RJmhXprLteoKt/sY4NIMzz3TBa0ivsVo6EFA9/G' + - '2q1MpuHxg86uY1pA4tkFlmxuSklZq5EKuu7B5RSeUGB+SsjDPSfsdhoPngMET1EXTZfVSWezjJkH' + - 'REyn5SgqpD7vwmwvZWcwWua+V+e/rYYF1cx0Y0Wi1wAC6NzGDce3gbcr/k6M/PiMi8ekJPCmgMgP' + - 'R4GBtRi121wU374nml42WjdGjHee6Se6d0OGKscJjB5Z1eSOho63OMn5Ayu4Lvl05L/mB88Hnk5e' + - 'MayYmXqQdhx3ualWiD/TdHwLf+79t43wfjs6Z/aq/lku67SGdUpjuYRV52nls6WPTuBmo2oCbzpP' + - 'MojIXiYHR3cWebI2CdnVTTHHz7el4NAWIlPKtZkPR6VYj2DkND4kmkO92I258hUqLARWnNaywx87' + - 'hFzhaGN9oKZdozKSZpEDyZUsymWuhQnnY76EImeha67LJwSsXLpBxuViCNlqv7ATT9iffzDmjm0i' + - '2MeLix3rBRMWp4MmWC8bP1ZAKOEq4M3hwjt1q68fH4QvtmTyic/7DBW2KsgZRu21RjK7tHtKTxwy' + - '2UN3pGT6uZRL8vNRNvD70UaeD5MW7lFBPehIeFoByaEKGFfeS8dKc1VFauDmkRlhOboLkPqqwbV+' + - 'tUmU8UvTftKIx1RwTm3FKqjKCYrdqp0fL5wsA93YhRJqHfkvtjCjzRc0czszURkJMHfPyttQg2jb' + - 'UQVIg40XMgew2EUdCrJC3wTvXm9tBxMqXAFBn6S4ihqiJ5PTUDKCx7EnAjK3kUwbWvDno8h1M9u9' + - 'teC5CEWhcAAAAAAwG5UqO9bdswAB6QWZCwAAKFDj9rHEZ/sCAAAAAARZWg==') - self.email_tpl['html'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AXiAt1dAB4aCobvStaHNqdVBn1LjZcL+G+98rmZ7eGZ' + - 'Wqx+3LjENIv37L1aGPICZDRBrsBugzVSasRBHkdHMrWW7SsPfRzw6btQfASTHLr48auPJlJgXTnb' + - 'vgDd2ELrs6p5kXZwE7+pedhgffAcekwr0eyqVNzzdUWJpvZcDBGtp+yvIwSAcrKkUxUd5AFBigd3' + - '4IW1XEK3eQ+LSSEIquUugrd3xiFB4rkSSDGbAuVLqy4Sq3w5c8RRAKavhfSn154/H/D+3RhqFHc/' + - '/x51rgXFvgJ/DwYrr9g7JV9EQB15JvJJxazgnWftZE0W0u05Z7QNrIZQMG6LjcSjf0ep1zNYrJkf' + - 'UYOfHNjXGfpmIfG0Y/cNhU1Zqv3ohv6EGWk4B7CdLjXEeDYqkJgIGA6Q4FQXM6PF7blXFQJ3papF' + - 'lH0iO+ElK3LZVcql+QcVt2Ci+hwiKzsUvV5ydnHezyViTYTppjlZzju5SxxddQg+7CwGzX9O8ys1' + - '8dlbMFHD2ruPd4Zig9B1TEKHSdmQQGwITNufbSGixbuOZAbfMXP1oQqSzYkbbx2ye8ddISOr/753' + - 'deLOwaQpy6tK9nb1wYwsfhpmZriWYDcKRfjkgr0srxnC2iDlMB0Do+GCLVVlmju9qcxeObWoxUaX' + - 'TqecRW4fbpa9xAIH0tZlOpIyPgGfm3CXkiGOs/J/QJ4C4spqNpwppoXg6EAig7Y9GStQyEsHXZrj' + - 'vLAefyaseybuMC+9okhx8VYM8esuE2GVTbCbWhn8ZTi8posQ+zabXvk7KE5zwGHDcvSGg0bYctJj' + - 'V6pExLCp1vCUdP3iP06OCFMINDnGR7ZP4Da/atBUuB/F0LN//x+HfwhEUpVTG52L7f6Qjd/LhvU2' + - 'f/zVfMKlw5xXwTjBu2X1oRYfhyYFhgnDECEi9iuRiVwwtnUU39r2XoaGcnMTPnZe62oy2jqTp3p+' + - 'Y+klB9jUwPUg2t5IxptZ0D/H5flD+pEAAAAAYczECM+Nfu0AAfkF4wsAAEsSt/GxxGf7AgAAAAAE' + - 'WVo=') - # Set up a dict of some constants and mappings - self.maps = {} - # Keylist modes - self.maps['keylist'] = {'local': gpg.constants.KEYLIST_MODE_LOCAL, # local keyring - 'remote': gpg.constants.KEYLIST_MODE_EXTERN, # keyserver - # both - this is SUPPOSED to work, but doesn't seem to... it's unreliable at best? - 'both': gpg.constants.KEYLIST_MODE_LOCAL|gpg.constants.KEYLIST_MODE_EXTERN} - # Validity/trust levels - self.maps['trust'] = {-1: ['never', gpg.constants.VALIDITY_NEVER], # this is... probably? not ideal, but. Never trust the key. - 0: ['unknown', gpg.constants.VALIDITY_UNKNOWN], # The key's trust is unknown - typically because it hasn't been set yet. - 1: ['untrusted', gpg.constants.VALIDITY_UNDEFINED], # The key is explicitly set to a blank trust - 2: ['marginal', gpg.constants.VALIDITY_MARGINAL], # Trust a little. - 3: ['full', gpg.constants.VALIDITY_FULL], # This is going to be the default for verified key ownership. - 4: ['ultimate', gpg.constants.VALIDITY_ULTIMATE]} # This should probably only be reserved for keys you directly control. - # Validity/trust reverse mappings - see self.maps['trust'] for the meanings of these - # Used for fetching display/feedback - self.maps['rtrust'] = {gpg.constants.VALIDITY_NEVER: 'Never', - gpg.constants.VALIDITY_UNKNOWN: 'Unknown', - gpg.constants.VALIDITY_UNDEFINED: 'Untrusted', - gpg.constants.VALIDITY_MARGINAL: 'Marginal', - gpg.constants.VALIDITY_FULL: 'Full', - gpg.constants.VALIDITY_ULTIMATE: 'Ultimate'} - # Local signature and other binary (True/False) mappings - self.maps['binmap'] = {0: ['no', False], - 1: ['yes', True]} - # Level of care taken when checking key ownership/valid identity - self.maps['check'] = {0: ['unknown', 0], - 1: ['none', 1], - 2: ['casual', 2], - 3: ['careful', 3]} - # Default protocol/port mappings for keyservers - self.maps['proto'] = {'hkp': [11371, ['tcp', 'udp']], # Standard HKP protocol - 'hkps': [443, ['tcp']], # Yes, same as https - 'http': [80, ['tcp']], # HTTP (plaintext) - 'https': [443, ['tcp']], # SSL/TLS - 'ldap': [389, ['tcp', 'udp']], # Includes TLS negotiation since it runs on the same port - 'ldaps': [636, ['tcp', 'udp']]} # SSL - self.maps['hashalgos'] = {gpg.constants.MD_MD5: 'md5', - gpg.constants.MD_SHA1: 'sha1', - gpg.constants.MD_RMD160: 'ripemd160', - gpg.constants.MD_MD2: 'md2', - gpg.constants.MD_TIGER: 'tiger192', - gpg.constants.MD_HAVAL: 'haval', - gpg.constants.MD_SHA256: 'sha256', - gpg.constants.MD_SHA384: 'sha384', - gpg.constants.MD_SHA512: 'sha512', - gpg.constants.MD_SHA224: 'sha224', - gpg.constants.MD_MD4: 'md4', - gpg.constants.MD_CRC32: 'crc32', - gpg.constants.MD_CRC32_RFC1510: 'crc32rfc1510', - gpg.constants.MD_CRC24_RFC2440: 'crc24rfc2440'} - # Now that all the static data's set up, we can continue. - self.args = self.verifyArgs(args) # Make the args accessible to all functions in the class - see docs/REF.args.struct.txt - # Get the GPGME context - try: - os.environ['GNUPGHOME'] = self.args['gpgdir'] - self.ctx = gpg.Context() - except: - raise RuntimeError('Could not use {0} as a GnuPG home'.format(self.args['gpgdir'])) - self.cfgdir = os.path.join(os.environ['HOME'], '.kant') - if not os.path.isdir(self.cfgdir): - print('No KANT configuration directory found; creating one at {0}...'.format(self.cfgdir)) - os.makedirs(self.cfgdir, exist_ok = True) - self.keys = {} # See docs/REF.keys.struct.txt - self.mykey = {} # "" - self.tpls = {} # Email templates will go here - self.getTpls() # Build out self.tpls - return(None) - - def getEditPrompt(self, key): # "key" should be the FPR of the primary key - # This mapping defines the default "answers" to the gpgme key editing. - # https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py - # https://searchcode.com/codesearch/view/20535820/ - # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS - # You can get the prompt identifiers and status indicators without grokking the source - # by first interactively performing the type of edit(s) you want to do with this command: - # gpg --status-fd 2 --command-fd 2 --edit-key - if key['trust'] >= gpg.constants.VALIDITY_FULL: # For tsigning, it only prompts for two trust levels: - _loctrust = 2 # "I trust fully" - else: - _loctrust = 1 # "I trust marginally" - # TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here. - # "Full" trust is a pretty big thing. - if key['trust'] >= gpg.constants.VALIDITY_FULL: - _locdepth = 2 # Allow +1 level of trust extension - else: - _locdepth = 1 # Only trust this key - _map = {'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list', # Valid commands - 'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref', - 'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save', - 'quit'], - 'prompts': {'edit_ownertrust': {'value': str(key['trust']), # Pulled at time of call - 'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes - 'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted - 'pklist': {'user_id': {'enter': key['pkey']['email']}}, # Prompt for a user ID - can we change this to key ID? - 'sign_uid': {'class': str(key['check']), # The certification/"check" level - 'okay': 'yes'}, # Are you sure that you want to sign this key with your key..." - 'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above - 'trust_depth': str(_locdepth), # The "depth" of the trust signature. - 'trust_regexp': None}, # We can "Restrict" trust to certain domains, but this isn't really necessary. - 'keyedit': {'prompt': 'trust', # Initiate trust editing - 'save': {'okay': 'yes'}}}} # Save if prompted - return(_map) - - def getTpls(self): - for t in ('plain', 'html'): - _tpl_file = os.path.join(self.cfgdir, 'email.{0}.j2'.format(t)) - if os.path.isfile(_tpl_file): - with open(_tpl_file, 'r') as f: - self.tpls[t] = f.read() - else: - self.tpls[t] = lzma.decompress(base64.b64decode(email_tpl[t]), - format = lzma.FORMAT_XZ, - memlimit = None, - filters = None).decode('utf-8') - with open(_tpl_file, 'w') as f: - f.write('{0}'.format(self.tpls[t])) - print('Created: {0}'.format(tpl_file)) - return(self.tpls) - - def modifyDirmngr(self, op): - if not self.args['keyservers']: - return() - _pid = str(os.getpid()) - _activecfg = os.path.join(self.args['gpgdir'], 'dirmngr.conf') - _activegpgconf = os.path.join(self.args['gpgdir'], 'gpg.conf') - _bakcfg = '{0}.{1}'.format(_activecfg, _pid) - _bakgpgconf = '{0}.{1}'.format(_activegpgconf, _pid) - ## Modify files - if op in ('new', 'start', 'replace'): - # Replace the keyservers - if os.path.lexists(_activecfg): - shutil.copy2(_activecfg, _bakcfg) - with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: - for line in read: - if not line.startswith('keyserver '): - write.write(line) - with open(_activecfg, 'a') as f: - for s in self.args['keyservers']: - _uri = '{0}://{1}:{2}'.format(s['proto'], - s['server'], - s['port'][0]) - f.write('keyserver {0}\n'.format(_uri)) - # Use stronger ciphers, etc. and prompt for check/certification levels - if os.path.lexists(_activegpgconf): - shutil.copy2(_activegpgconf, _bakgpgconf) - with open(_activegpgconf, 'w') as f: - f.write('cipher-algo AES256\ndigest-algo SHA512\ncert-digest-algo SHA512\ncompress-algo BZIP2\nask-cert-level\n') - ## Restore files - if op in ('old', 'stop', 'restore'): - # Restore the keyservers - if os.path.lexists(_bakcfg): - with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: - for line in read: - write.write(line) - os.remove(_bakcfg) - else: - os.remove(_activecfg) - # Restore GPG settings - if os.path.lexists(_bakgpgconf): - with open(_bakgpgconf, 'r') as read, open(_activegpgconf, 'w') as write: - for line in read: - write.write(line) - os.remove(_bakgpgconf) - else: - os.remove(_activegpgconf) - subprocess.run(['gpgconf', '--reload', 'dirmngr']) # I *really* wish we could do this via GPGME. - return() - - def getKeys(self): - _keyids = [] - _keys = {} - # Do we have the key already? If not, fetch. - for r in list(self.args['rcpts'].keys()): - if self.args['rcpts'][r]['type'] == 'fpr': - _keyids.append(r) - self.ctx.set_keylist_mode(self.maps['keylist']['remote']) - try: - _k = self.ctx.get_key(r) - except: - print('{0}: We could not find this key on the keyserver.'.format(r)) # Key not on server - del(self.args['rcpts'][r]) - _keyids.remove(r) - continue - self.ctx.set_keylist_mode(self.maps['keylist']['local']) - _keys[r] = {'fpr': r, - 'obj': _k, - 'created': _k.subkeys[0].timestamp} - if 'T' in str(_keys[r]['created']): - _keys[r]['created'] = int(datetime.datetime.strptime(_keys[r]['created'], - '%Y%m%dT%H%M%S').timestamp()) - if self.args['rcpts'][r]['type'] == 'email': - # We need to actually do a lookup on the email address. - _keytmp = [] - for k in self.ctx.keylist(r, mode = self.maps['keylist']['remote']): - _keytmp.append(k) - for k in _keytmp: - _keys[k.fpr] = {'fpr': k.fpr, - 'obj': k, - 'created': k.subkeys[0].timestamp, - 'uids': {}} - # Per the docs (/docs/DETAILS, "*** Field 6 - Creation date"), - # they may change this to ISO 8601... - if 'T' in str(_keys[k.fpr]['created']): - _keys[k.fpr]['created'] = int(datetime.datetime.strptime(_keys[k.fpr]['created'], - '%Y%m%dT%H%M%S').timestamp()) - for s in k.uids: - _keys[k.fpr]['uids'][s.email] = {'comment': s.comment, - 'updated': s.last_update} - if len(_keytmp) > 1: # Print the keys and prompt for a selection. - print('\nWe found the following keys for {0} <{1}>...\n\nKEY ID:'.format(r, _orig_r)) - for s in _keys[r]: - print(s, _keys[r]) - print('{0}\n{1:6}(Generated at {2}) UIDs:'.format(k, - '', - datetime.datetime.utcfromtimestamp(s['updated']))) - for email in _keys[k]['uids']: - print('{0:42}(Generated {3}) <{2}> {1}'.format('', - s['uids'][email]['comment'], - email, - datetime.datetime.utcfromtimestamp(s['uids'][email]['updated']))) - print() - while True: - key = input('Please enter the (full) appropriate key: ') - if key not in keys.keys(): - print('Please enter a full key ID from the list above or hit ctrl-d to exit.') - else: - _keyids.append(key) - break - else: - if len(_keytmp) == 0: - print('Could not find {0}!'.format(r)) - del(self.args['rcpts'][r]) - continue - _keyids.append(_keys[k.fpr]['fpr']) - print('\nFound key {0} for {1} (Generated at {2}):'.format(_keys[k.fpr]['fpr'], - r, - datetime.datetime.utcfromtimestamp(_keys[k.fpr]['created']))) - for email in _keys[k.fpr]['uids']: - print('\t(Generated {2}) {0} <{1}>'.format(_keys[k.fpr]['uids'][email]['comment'], - email, - datetime.datetime.utcfromtimestamp(_keys[k.fpr]['uids'][email]['updated']))) - print() - ## And now we can (FINALLY) fetch the key(s). - # TODO: replace with gpg.keylist_mode(gpgme.KEYLIST_MODE_EXTERN) and internal mechanisms? - for g in _keyids: - try: - self.ctx.op_import_keys([_keys[g]['obj']]) - except gpg.errors.GPGMEError: - print('Key {0} could not be found on the keyserver'.format(g)) # The key isn't on the keyserver - self.ctx.set_keylist_mode(self.maps['keylist']['local']) - for k in _keys: - _key = _keys[k]['obj'] - self.keys[k] = {'pkey': {'email': _key.uids[0].email, - 'name': _key.uids[0].name, - 'creation': datetime.datetime.utcfromtimestamp(_keys[k]['created']), - 'key': _key}, - 'trust': self.args['trustlevel'], # Not set yet; we'll modify this later in buildKeys(). - 'local': self.args['local'], # Not set yet; we'll modify this later in buildKeys(). - 'notify': self.args['notify'], # Same... - 'sign': True, # We don't need to prompt for this since we detect if we need to sign or not - 'change': None, # "" - 'status': None} # Same. - # And we add the subkeys in yet another loop. - self.keys[k]['subkeys'] = {} - self.keys[k]['uids'] = {} - for s in _key.subkeys: - self.keys[k]['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) - for u in _key.uids: - self.keys[k]['uids'][u.email] = {'name': u.name, - 'comment': u.comment, - 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} - del(_keys) - return() - - def buildKeys(self): - self.getKeys() - # Before anything else, let's set up our own key info. - _key = self.ctx.get_key(self.args['sigkey'], secret = True) - self.mykey = {'pkey': {'email': _key.uids[0].email, - 'name': _key.uids[0].name, - 'creation': datetime.datetime.utcfromtimestamp(_key.subkeys[0].timestamp), - 'key': _key}, - 'trust': 'ultimate', # No duh. This is our own key. - 'local': False, # We keep our own key array separate, so we don't push it anyways. - 'notify': False, # "" - 'check': None, # "" - 'change': False, # "" - 'status': None, # "" - 'sign': False} # "" - self.mykey['subkeys'] = {} - self.mykey['uids'] = {} - for s in _key.subkeys: - self.mykey['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) - for u in _key.uids: - self.mykey['uids'][u.email] = {'name': u.name, - 'comment': u.comment, - 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} - # Now let's set up our trusts. - if self.args['batch']: - self.batchParse() - else: - for k in list(self.keys.keys()): - self.promptTrust(k) - self.promptCheck(k) - self.promptLocal(k) - self.promptNotify(k) - # In case we removed any keys, we have to run this outside of the loops - for k in list(self.keys.keys()): - for t in ('trust', 'local', 'check', 'notify'): - self.keysCleanup(k, t) - # TODO: populate self.keys[key]['change']; we use this for trust (but not sigs) - return() - - def batchParse(self): - # First we grab the info from CSV - csvlines = csv.reader(self.csvraw, delimiter = ',', quotechar = '"') - for row in csvlines: - row[0] = row[0].replace('<', '').replace('>', '') - try: - if self.args['rcpts'][row[0]]['type'] == 'fpr': - k = row[0] - else: # It's an email. - key_set = False - while not key_set: - for i in list(self.keys.keys()): - if row[0] in list(self.keys[i]['uids'].keys()): - k = i - key_set = True - self.keys[k]['trust'] = row[1].lower().strip() - self.keys[k]['local'] = row[2].lower().strip() - self.keys[k]['check'] = row[3].lower().strip() - self.keys[k]['notify'] = row[4].lower().strip() - except KeyError: - continue # It was deemed to be an invalid key earlier - return() - - def promptTrust(self, k): - if 'trust' not in self.keys[k].keys() or not self.keys[k]['trust']: - trust_in = input(('\nWhat trust level should we assign to {0}? (The default is '+ - 'Marginal.)\n\t\t\t\t ({1} <{2}>)' + - '\n\n\t\033[1m-1 = Never\n\t 0 = Unknown\n\t 1 = Untrusted\n\t 2 = Marginal\n\t 3 = Full' + - '\n\t 4 = Ultimate\033[0m\nTrust: ').format(k, - self.keys[k]['pkey']['name'], - self.keys[k]['pkey']['email'])) - if trust_in == '': - trust_in = 'marginal' # Has to be a str, so we can "pretend" it was entered - self.keys[k]['trust'] = trust_in - return() - - def promptCheck(self, k): - if 'check' not in self.keys[k].keys() or self.keys[k]['check'] == None: - check_in = input(('\nHow carefully have you checked {0}\'s validity of identity/ownership of the key? ' + - '(Default is Unknown.)\n' + - '\n\t\033[1m0 = Unknown\n\t1 = None\n\t2 = Casual\n\t3 = Careful\033[0m\nCheck level: ').format(k)) - if check_in == '': - check_in == 'unknown' - self.keys[k]['check'] = check_in - return() - - def promptLocal(self, k): - if 'local' not in self.keys[k].keys() or self.keys[k]['local'] == None: - if self.args['keyservers']: - local_in = input(('\nShould we locally sign {0} '+ - '(if yes, the signature will be non-exportable; if no, we will be able to push to a keyserver) ' + - '(Yes/\033[1mNO\033[0m)? ').format(k)) - if local_in == '': - local_in = False - self.keys[k]['local'] = local_in - return() - - def promptNotify(self, k): - if 'notify' not in self.keys[k].keys() or self.keys[k]['notify'] == None: - notify_in = input(('\nShould we notify {0} (via <{1}>) (\033[1mYES\033[0m/No)? ').format(k, - self.keys[k]['pkey']['email'])) - if notify_in == '': - notify_in = True - self.keys[k]['local'] = local_in - return() - - def keysCleanup(self, k, t): # At some point, this WHOLE thing would probably be cleaner with bitwise flags... - s = t - _errs = {'trust': 'trust level', - 'local': 'local signature option', - 'check': 'check level', - 'notify': 'notify flag'} - if k not in self.keys.keys(): - return() # It was deleted already. - if t in ('local', 'notify'): # these use a binary mapping - t = 'binmap' - # We can do some basic stuff right here. - if str(self.keys[k][s]).lower() in ('n', 'no', 'false'): - self.keys[k][s] = False - return() - elif str(self.keys[k][s]).lower() in ('y', 'yes', 'true'): - self.keys[k][s] = True - return() - # Make sure we have a known value. These will ALWAYS be str's, either from the CLI or CSV. - value_in = str(self.keys[k][s]).lower().strip() - for dictk, dictv in self.maps[t].items(): - if value_in == dictv[0]: - self.keys[k][s] = int(dictk) - elif value_in == str(dictk): - self.keys[k][s] = int(dictk) - if not isinstance(self.keys[k][s], int): # It didn't get set - print('{0}: "{1}" is not a valid {2}; skipping. Run kant again to fix.'.format(k, self.keys[k][s], _errs[s])) - del(self.keys[k]) - return() - return() - - def sigKeys(self): # The More Business-End(TM) - # NOTE: If the trust level is anything but 2 (the default), we should use op_interact() instead and do a tsign. - self.ctx.keylist_mode = gpg.constants.KEYLIST_MODE_SIGS - _mkey = self.mykey['pkey']['key'] - self.ctx.signers = [_mkey] - for k in list(self.keys.keys()): - key = self.keys[k]['pkey']['key'] - for uid in key.uids: - for s in uid.signatures: - try: - signerkey = ctx.get_key(s.keyid).subkeys[0].fpr - if signerkey == mkey.subkeys[0].fpr: - self.trusts[k]['sign'] = False # We already signed this key - except gpgme.GpgError: - pass # usually if we get this it means we don't have a signer's key in our keyring - # And again, we loop. ALLLLL that buildup for one line. - for k in list(self.keys.keys()): - # TODO: configure to allow for user-entered expiration? - if self.keys[k]['sign']: - self.ctx.key_sign(self.keys[k]['pkey']['key'], local = self.keys[k]['local']) - return() - - class KeyEditor(object): - def __init__(self, optmap): - self.replied_once = False # This is used to handle the first prompt vs. the last - self.optmap = optmap - return(None) - - def editKey(self, status, args, out): - _result = None - out.seek(0, 0) - def mapDict(m, d): - return(reduce(operator.getitem, m, d)) - if args == 'keyedit.prompt' and self.replied_once: - _result = 'quit' - elif status == 'KEY_CONSIDERED': - _result = None - self.replied_once = False - elif status == 'GET_LINE': - self.replied_once = True - _ilist = args.split('.') - _result = mapDict(_ilist, self.optmap['prompts']) - if not _result: - _result = None - return(_result) - - def trustKeys(self): # The Son of Business-End(TM) - # TODO: add check for change - for k in list(self.keys.keys()): - _key = self.keys[k] - _map = self.getEditPrompt(_key) - out = gpg.Data() - self.ctx.interact(_key['pkey']['key'], self.KeyEditor(_map).editKey, sink = out, fnc_value = out) - out.seek(0, 0) - return() - - def pushKeys(self): # The Last Business-End(TM) - for k in list(self.keys.keys()): - if not self.keys[k]['local'] and self.keys[k]['sign']: - self.ctx.op_export(k, gpg.constants.EXPORT_MODE_EXTERN, None) - return() - - class Mailer(object): # I lied; The Return of the Business-End(TM) - def __init__(self): - _homeconf = os.path.join(os.environ['HOME'], '.msmtprc') - _sysconf = '/etc/msmtprc' - self.msmtp = {'path': None} - if not os.path.isfile(_homeconf): - if not os.path.isfile(_sysconf): - self.msmtp['conf'] = False - else: - self.msmtp['conf'] = _sysconf - else: - self.msmtp['conf'] = _homeconf - if os.path.isfile(self.msmtp['conf']): - for p in (os.environ['PATH']).split(':'): - if os.path.isfile(os.path.join(p, 'msmtp')): - self.msmtp['path'] = os.path.join(p, 'msmtp') - if self.msmtp['path']: - # Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary. - # Now we need to parse the config. - self.msmtp['cfg'] = self.getCfg() - return(None) - - def getCfg(self): - cfg = {'default': None, 'defaults': {}} - _defaults = False - _acct = None - with open(self.msmtp['conf'], 'r') as f: - _cfg_raw = f.read() - for l in _cfg_raw.splitlines(): - if re.match('^\s?(#.*|)$', l): - continue # Skip over blank and commented lines - _line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)] - if _line[0] == 'account': - if re.match('^default\s?:\s?', _line[1]): # it's the default account specifier - cfg['default'] = _line[1].split(':', maxsplit = 1)[1].strip() - else: - if _line[1] not in cfg.keys(): # it's a new account definition - cfg[_line[1]] = {} - _acct = _line[1] - _defaults = False - elif _line[0] == 'defaults': # it's the defaults - _acct = 'defaults' - else: # it's a config directive - cfg[_acct][_line[0]] = _line[1] - for a in list(cfg): - if a != 'default': - for k, v in cfg['defaults'].items(): - if k not in cfg[a].keys(): - cfg[a][k] = v - del(cfg['defaults']) - return(cfg) - - def sendEmail(self, msg, key, profile): # This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc. - if 'tls-starttls' in self.msmtp['cfg'][profile].keys() and self.msmtp['cfg'][profile]['tls-starttls'] == 'on': - smtpserver = smtplib.SMTP(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) - smtpserver.ehlo() - smtpserver.starttls() - # we need to EHLO twice with a STARTTLS because email is weird. - elif self.msmtp['cfg'][profile]['tls'] == 'on': - smtpserver = smtplib.SMTP_SSL(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) - smtpserver.ehlo() - smtpserver.login(self.msmtp['cfg'][profile]['user'], self.msmtp['cfg'][profile]['password']) - smtpserver.sendmail(self.msmtp['cfg'][profile]['user'], key['pkey']['email'], msg.as_string()) - smtpserver.close() - return() - - def postalWorker(self): - m = self.Mailer() - if 'KANT' in m.msmtp['cfg'].keys(): - _profile = 'KANT' - else: - _profile = m.msmtp['cfg']['default'] # TODO: let this be specified on the CLI args? - if 'user' not in m.msmtp['cfg'][_profile].keys() or not m.msmtp['cfg'][_profile]['user']: - return() # We don't have MSMTP configured. - # Reconstruct the keyserver list. - _keyservers = [] - for k in self.args['keyservers']: - _keyservers.append('{0}://{1}:{2}'.format(k['proto'], k['server'], k['port'][0])) - # Export our key so we can attach it. - _pubkeys = {} - for e in ('asc', 'gpg'): - if e == 'asc': - self.ctx.armor = True - else: - self.ctx.armor = False - _pubkeys[e] = gpg.Data() # This is a data buffer to store your ASCII-armored pubkeys - self.ctx.op_export_keys([self.mykey['pkey']['key']], 0, _pubkeys[e]) - _pubkeys[e].seek(0, 0) # Read with e.g. _sigs['asc'].read() - for k in list(self.keys.keys()): - if self.keys[k]['notify']: - _body = {} - for t in list(self.tpls.keys()): - # There's gotta be a more efficient way of doing this... - #_tplenv = jinja2.Environment(loader = jinja2.BaseLoader()).from_string(self.tpls[t]) - _tplenv = jinja2.Environment().from_string(self.tpls[t]) - _body[t] = _tplenv.render(key = self.keys[k], - mykey = self.mykey, - keyservers = _keyservers) - b = MIMEMultipart('alternative') # Set up a body - for c in _body.keys(): - b.attach(MIMEText(_body[c], c)) - bmsg = MIMEMultipart() - bmsg.attach(b) - for s in _pubkeys.keys(): - _attchmnt = MIMEApplication(_pubkeys[s].read(), '{0}.{1}'.format(self.mykey['pkey']['key'].fpr, s)) - _attchmnt['Content-Disposition'] = 'attachment; filename="{0}.{1}"'.format(self.mykey['pkey']['key'].fpr, s) - bmsg.attach(_attchmnt) - # Now we sign the body. This incomprehensible bit monkey-formats bmsg to be a multi-RFC-compatible - # string, which is then passed to our gpgme instance's signing mechanishm, and the output of that is - # returned as plaintext. Whew. - self.ctx.armor = True - - _sig = self.ctx.sign((bmsg.as_string().replace('\n', '\r\n')).encode('utf-8'), - mode = gpg.constants.SIG_MODE_DETACH) - imsg = Message() # Build yet another intermediate message... - imsg['Content-Type'] = 'application/pgp-signature; name="signature.asc"' - imsg['Content-Description'] = 'OpenPGP digital signature' - imsg.set_payload(_sig[0].decode('utf-8')) - msg = MIMEMultipart(_subtype = 'signed', - micalg = "pgp-{0}".format(self.maps['hashalgos'][_sig[1].signatures[0].hash_algo]), - protocol = 'application/pgp-signature') - msg.attach(bmsg) # Attach the body (plaintext, html, pubkey attachmants) - msg.attach(imsg) # Attach the isignature - msg['To'] = self.keys[k]['pkey']['email'] - if 'from' in m.msmtp['cfg'][_profile].keys(): - msg['From'] = m.msmtp['cfg'][_profile]['from'] - else: - msg['From'] = self.mykey['pkey']['email'] - msg['Subject'] = 'Your GnuPG/PGP key has been signed' - msg['Openpgp'] = 'id={0}'.format(self.mykey['pkey']['key'].fpr) - msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') - msg['User-Agent'] = 'KANT (part of the r00t^2 OpTools suite: https://git.square-r00t.net/OpTools)' - m.sendEmail(msg, self.keys[k], _profile) # Send the email - for d in (msg, imsg, bmsg, b, _body, _tplenv): # Not necessary, but it pays to be paranoid; we do NOT want leaks. - del(d) - del(m) - return() - - def saveResults(self): - _cachedir = os.path.join(self.cfgdir, 'cache', datetime.datetime.utcnow().strftime('%Y.%m.%d_%H.%M.%S')) - os.makedirs(_cachedir, exist_ok = True) - for k in self.keys.keys(): - _keyout = self.keys[k] - # We need to normalize the datetime objects and gpg objects to strings - _keyout['pkey']['creation'] = str(self.keys[k]['pkey']['creation']) - _keyout['pkey']['key'] = '' - for u in list(_keyout['uids'].keys()): - _keyout['uids'][u]['updated'] = str(self.keys[k]['uids'][u]['updated']) - for s in list(_keyout['subkeys'].keys()): - _keyout['subkeys'][s] = str(self.keys[k]['subkeys'][s]) - _fname = os.path.join(_cachedir, '{0}.json'.format(k)) - with open(_fname, 'a') as f: - f.write('{0}\n'.format(json.dumps(_keyout, sort_keys = True, indent = 4))) - del(_keyout) - # And let's grab a copy of our key in the state that it exists in currently - _mykey = self.mykey - # We need to normalize the datetime objects and gpg objects to strings again - _mykey['pkey']['creation'] = str(_mykey['pkey']['creation']) - _mykey['pkey']['key'] = '' - for u in list(_mykey['uids'].keys()): - _mykey['uids'][u]['updated'] = str(self.mykey['uids'][u]['updated']) - for s in list(_mykey['subkeys'].keys()): - _mykey['subkeys'][s] = str(self.mykey['subkeys'][s]) - with open(os.path.join(_cachedir, '_SIGKEY.json'), 'w') as f: - f.write('{0}\n'.format(json.dumps(_mykey, sort_keys = True, indent = 4))) - return() - - def serverParser(self, uri): - # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples - _server = {} - _urlobj = urllib.parse.urlparse(uri) - _server['proto'] = _urlobj.scheme - _lazy = False - if not _server['proto']: - _server['proto'] = 'hkp' # Default - _server['server'] = _urlobj.hostname - if not _server['server']: - _server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri, re.MULTILINE) - _lazy = True - _server['port'] = _urlobj.port - if not _server['port']: - if _lazy: - _p = re.sub('.*:([0-9]+)$', '\g<1>', uri, re.MULTILINE) - _server['port'] = self.maps['proto'][_server['proto']] # Default - return(_server) - - def verifyArgs(self, locargs): - ## Some pythonization... - if not locargs['batch']: - locargs['keys'] = [re.sub('\s', '', k) for k in locargs['keys'].split(',')] - else: - ## Batch file - _batchfilepath = os.path.abspath(os.path.expanduser(locargs['keys'])) - if not os.path.isfile(_batchfilepath): - raise ValueError('{0} does not exist or is not a regular file.'.format(_batchfilepath)) - else: - with open(_batchfilepath, 'r') as f: - self.csvraw = f.readlines() - locargs['keys'] = _batchfilepath - locargs['keyservers'] = [re.sub('\s', '', s) for s in locargs['keyservers'].split(',')] - locargs['keyservers'] = [self.serverParser(s) for s in locargs['keyservers']] - ## Key(s) to sign - locargs['rcpts'] = {} - if not locargs['batch']: - _keyiter = locargs['keys'] - else: - _keyiter = [] - for row in csv.reader(self.csvraw, delimiter = ',', quotechar = '"'): - _keyiter.append(row[0]) - for k in _keyiter: - locargs['rcpts'][k] = {} - try: - int(k, 16) - _ktype = 'fpr' - except: # If it isn't a valid key ID... - if not re.match('^?$', k): # is it an email address? - raise ValueError('{0} is not a valid email address'.format(k)) - else: - r = k.replace('<', '').replace('>', '') - locargs['rcpts'][r] = locargs['rcpts'][k] - del(locargs['rcpts'][k]) - k = r - _ktype = 'email' - locargs['rcpts'][k]['type'] = _ktype - # Security is important. We don't want users getting collisions, so we don't allow shortened key IDs. - if _ktype == 'fpr' and not len(k) == 40: - raise ValueError('{0} is not a full 40-char key ID or key fingerprint'.format(k)) - ## Signing key - if not locargs['sigkey']: - raise ValueError('A key for signing is required') # We need a key we can sign with. - else: - if not os.path.lexists(locargs['gpgdir']): - raise FileNotFoundError('{0} does not exist'.format(locargs['gpgdir'])) - elif os.path.isfile(locargs['gpgdir']): - raise NotADirectoryError('{0} is not a directory'.format(locargs['gpgdir'])) - # Now we need to verify that the private key exists... - try: - _ctx = gpg.Context() - _sigkey = _ctx.get_key(locargs['sigkey'], True) - except gpg.errors.GPGMEError or gpg.errors.KeyNotFound: - raise ValueError('Cannot use key {0}'.format(locargs['sigkey'])) - # And that it is an eligible candidate to use to sign. - if not _sigkey.can_sign or True in (_sigkey.revoked, _sigkey.expired, _sigkey.disabled): - raise ValueError('{0} is not a valid candidate for signing'.format(locargs['sigkey'])) - ## Keyservers - if locargs['testkeyservers']: - for s in locargs['keyservers']: - # Test to make sure the keyserver is accessible. - _v6test = socket(AF_INET6, SOCK_DGRAM) - try: - _v6test.connect(('ipv6.square-r00t.net', 0)) - _nettype = AF_INET6 # We have IPv6 intarwebz - except: - _nettype = AF_INET # No IPv6, default to IPv4 - for _proto in locargs['keyservers'][s]['port'][1]: - if _proto == 'udp': - _netproto = SOCK_DGRAM - elif _proto == 'tcp': - _netproto = SOCK_STREAM - _sock = socket(nettype, netproto) - _sock.settimeout(10) - _tests = _sock.connect_ex((locargs['keyservers'][s]['server'], - int(locargs['keyservers'][s]['port'][0]))) - _uristr = '{0}://{1}:{2} ({3})'.format(locargs['keyservers'][s]['proto'], - locargs['keyservers'][s]['server'], - locargs['keyservers'][s]['port'][0], - _proto.upper()) - if not tests == 0: - raise OSError('Keyserver {0} is not available'.format(_uristr)) - else: - print('Keyserver {0} is accepting connections.'.format(_uristr)) - sock.close() - return(locargs) - -def parseArgs(): - def getDefGPGDir(): - try: - gpgdir = os.environ['GNUPGHOME'] - except KeyError: - try: - homedir = os.environ['HOME'] - gpgdchk = os.path.join(homedir, '.gnupg') - except KeyError: - # There is no reason that this should ever get this far, but... edge cases be crazy. - gpgdchk = os.path.join(os.path.expanduser('~'), '.gnupg') - if os.path.isdir(gpgdchk): - gpgdir = gpgdchk - else: - gpgdir = None - return(gpgdir) - def getDefKey(defgpgdir): - os.environ['GNUPGHOME'] = defgpgdir - if not defgpgdir: - return(None) - defkey = None - ctx = gpg.Context() - for k in ctx.keylist(None, secret = True): # "None" is query string; this grabs all keys in the private keyring - if k.can_sign and True not in (k.revoked, k.expired, k.disabled): - defkey = k.subkeys[0].fpr - break # We'll just use the first primary key we find that's valid as the default. - return(defkey) - def getDefKeyservers(defgpgdir): - srvlst = [None] - # We don't need these since we use the gpg agent. Requires GPG 2.1 and above, probably. - #if os.path.isfile(os.path.join(defgpgdir, 'dirmngr.conf')): - # pass - dirmgr_out = subprocess.run(['gpg-connect-agent', '--dirmngr', 'keyserver', '/bye'], stdout = subprocess.PIPE) - for l in dirmgr_out.stdout.decode('utf-8').splitlines(): - #if len(l) == 3 and l.lower().startswith('s keyserver'): # It's a keyserver line - if l.lower().startswith('s keyserver'): # It's a keyserver line - s = l.split()[2] - if len(srvlst) == 1 and srvlst[0] == None: - srvlst = [s] - else: - srvlst.append(s) - return(','.join(srvlst)) - defgpgdir = getDefGPGDir() - defkey = getDefKey(defgpgdir) - defkeyservers = getDefKeyservers(defgpgdir) - args = argparse.ArgumentParser(description = 'Keysigning Assistance and Notifying Tool (KANT)', - epilog = 'brent s. || 2017 || https://square-r00t.net') - args.add_argument('-k', - '--keys', - dest = 'keys', - metavar = 'KEYS | /path/to/batchfile', - required = True, - help = 'A single/comma-separated list of keys to sign, ' + - 'trust, & notify. Can also be an email address. ' + - 'If -b/--batch is specified, this should instead be ' + - 'a path to the batch file. See the man page for more info.') - args.add_argument('-K', - '--sigkey', - dest = 'sigkey', - default = defkey, - help = 'The key to use when signing other keys. Default is \033[1m{0}\033[0m.'.format(defkey)) - args.add_argument('-t', - '--trust', - dest = 'trustlevel', - default = None, - help = 'The trust level to automatically apply to all keys ' + - '(if not specified, kant will prompt for each key). ' + - 'See BATCHFILE/TRUSTLEVEL in the man page for trust ' + - 'level notations.') - args.add_argument('-c', - '--check', - dest = 'checklevel', - default = None, - help = 'The level of checking done (if not specified, kant will ' + - 'prompt for each key). See -b/--batch for check level notations.') - args.add_argument('-l', - '--local', - dest = 'local', - default = None, - help = 'Make the signature(s) local-only (i.e. don\'t push to a keyserver).') - args.add_argument('-n', - '--no-notify', - dest = 'notify', - action = 'store_false', - help = 'If specified, do NOT notify any key recipients that you\'ve signed ' + - 'their key, even if KANT is able to.') - args.add_argument('-s', - '--keyservers', - dest = 'keyservers', - default = defkeyservers, - help = 'The comma-separated keyserver(s) to push to.\n' + - 'Default keyserver list is: \n\n\t\033[1m{0}\033[0m\n\n'.format(re.sub(',', - '\n\t', - defkeyservers))) - args.add_argument('-b', - '--batch', - dest = 'batch', - action = 'store_true', - help = 'If specified, -k/--keys is a CSV file to use as a ' + - 'batch run. See the BATCHFILE section in the man page for more info.') - args.add_argument('-D', - '--gpgdir', - dest = 'gpgdir', - default = defgpgdir, - help = 'The GnuPG configuration directory to use (containing\n' + - 'your keys, etc.); default is \033[1m{0}\033[0m.'.format(defgpgdir)) - args.add_argument('-T', - '--testkeyservers', - dest = 'testkeyservers', - action = 'store_true', - help = 'If specified, initiate a test connection with each\n' - 'set keyserver before anything else. Disabled by default.') - return(args) - - - - - -def main(): - # This could be cleaner-looking, but we do it this way so the class can be used externally - # with a dict instead of an argparser result. - args = vars(parseArgs().parse_args()) - sess = SigSession(args) - sess.modifyDirmngr('new') - sess.buildKeys() - sess.sigKeys() - sess.trustKeys() - sess.pushKeys() - sess.postalWorker() - sess.saveResults() - sess.modifyDirmngr('old') - -if __name__ == '__main__': - main() diff --git a/gpg/kant/kant.py b/gpg/kant/kant.py index eaaa68d..26ffa72 100755 --- a/gpg/kant/kant.py +++ b/gpg/kant/kant.py @@ -1,389 +1,812 @@ #!/usr/bin/env python3 import argparse +import base64 import csv import datetime -import email -import jinja2 +import json +import lzma +import operator import os import re import shutil +import smtplib import subprocess +from email.message import Message +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from functools import reduce from io import BytesIO from socket import * import urllib.parse -import gpg # non-stdlib; Arch package is "python-gpgme" - see - # https://git.archlinux.org/svntogit/packages.git/tree/trunk/PKGBUILD?h=packages/gpgme and - # https://pypi.python.org/pypi/gpg +import jinja2 # non-stdlib; Arch package is python-jinja2 +import gpg # non-stdlib; Arch package is "python-gpgme" - see: +import gpg.constants # https://git.archlinux.org/svntogit/packages.git/tree/trunk/PKGBUILD?h=packages/gpgme and +import gpg.errors # https://gnupg.org/ftp/gcrypt/gpgme/ (incl. python bindings in build) +import pprint # development debug -# TODO: -# - http://tanguy.ortolo.eu/blog/article9/pgp-signature-infos edit certification level- possible with pygpgme? -# -attach pubkey when sending below email -# mail to first email address in key with signed message: -#Subj: Your GPG key has been signed -# -#Hello! Thank you for participating in a keysigning party and exchanging keys. -# -#I have signed your key (KEYID) with trust level "TRUSTLEVEL" because: -# -#* You have presented sufficient proof of identity -# -#The signatures have been pushed to KEYSERVERS. -# -#I have taken the liberty of attaching my public key in the event you've not signed it yet and were unable to find it. -#Please feel free to push to hkps://sks.mirror.square-r00t.net:11371 (you can do this with "gpg --keyserver ...) or any other SKS-pooled keyserver[0]. -# -#As a reminder, my key ID, Keybase.io username, and verification/proof of identity can all be found at: -# -#https://square-r00t.net/gpg-info -# -#Thanks again! -#[0] https://mirror.square-r00t.net/#svcs-sks -class sigsession(object): +class SigSession(object): # see docs/REFS.funcs.struct.txt def __init__(self, args): - self.args = args - self.keyids = [] + # These are the "stock" templates for emails. It's a PITA, but to save some space since we store them + # inline in here, they're XZ'd and base64'd. + self.email_tpl = {} + self.email_tpl['plain'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AWYAs1dACQZSZhvFgKNdKNXbSf05z0ZPvTvmdQ0mJQg' + + 'atgzhPVeLKxz22bhxedC813X5I8Gn2g9q9Do2jPPViyuLf1SFHDUx1m7SEFsSUyT7L/j71tuxRZi' + + 'xLyy6P3mHo3HeUhwgpgh6lvMYwlTf+zhj3558RJmhXprLteoKt/sY4NIMzz3TBa0ivsVo6EFA9/G' + + '2q1MpuHxg86uY1pA4tkFlmxuSklZq5EKuu7B5RSeUGB+SsjDPSfsdhoPngMET1EXTZfVSWezjJkH' + + 'REyn5SgqpD7vwmwvZWcwWua+V+e/rYYF1cx0Y0Wi1wAC6NzGDce3gbcr/k6M/PiMi8ekJPCmgMgP' + + 'R4GBtRi121wU374nml42WjdGjHee6Se6d0OGKscJjB5Z1eSOho63OMn5Ayu4Lvl05L/mB88Hnk5e' + + 'MayYmXqQdhx3ualWiD/TdHwLf+79t43wfjs6Z/aq/lku67SGdUpjuYRV52nls6WPTuBmo2oCbzpP' + + 'MojIXiYHR3cWebI2CdnVTTHHz7el4NAWIlPKtZkPR6VYj2DkND4kmkO92I258hUqLARWnNaywx87' + + 'hFzhaGN9oKZdozKSZpEDyZUsymWuhQnnY76EImeha67LJwSsXLpBxuViCNlqv7ATT9iffzDmjm0i' + + '2MeLix3rBRMWp4MmWC8bP1ZAKOEq4M3hwjt1q68fH4QvtmTyic/7DBW2KsgZRu21RjK7tHtKTxwy' + + '2UN3pGT6uZRL8vNRNvD70UaeD5MW7lFBPehIeFoByaEKGFfeS8dKc1VFauDmkRlhOboLkPqqwbV+' + + 'tUmU8UvTftKIx1RwTm3FKqjKCYrdqp0fL5wsA93YhRJqHfkvtjCjzRc0czszURkJMHfPyttQg2jb' + + 'UQVIg40XMgew2EUdCrJC3wTvXm9tBxMqXAFBn6S4ihqiJ5PTUDKCx7EnAjK3kUwbWvDno8h1M9u9' + + 'teC5CEWhcAAAAAAwG5UqO9bdswAB6QWZCwAAKFDj9rHEZ/sCAAAAAARZWg==') + self.email_tpl['html'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AXiAt1dAB4aCobvStaHNqdVBn1LjZcL+G+98rmZ7eGZ' + + 'Wqx+3LjENIv37L1aGPICZDRBrsBugzVSasRBHkdHMrWW7SsPfRzw6btQfASTHLr48auPJlJgXTnb' + + 'vgDd2ELrs6p5kXZwE7+pedhgffAcekwr0eyqVNzzdUWJpvZcDBGtp+yvIwSAcrKkUxUd5AFBigd3' + + '4IW1XEK3eQ+LSSEIquUugrd3xiFB4rkSSDGbAuVLqy4Sq3w5c8RRAKavhfSn154/H/D+3RhqFHc/' + + '/x51rgXFvgJ/DwYrr9g7JV9EQB15JvJJxazgnWftZE0W0u05Z7QNrIZQMG6LjcSjf0ep1zNYrJkf' + + 'UYOfHNjXGfpmIfG0Y/cNhU1Zqv3ohv6EGWk4B7CdLjXEeDYqkJgIGA6Q4FQXM6PF7blXFQJ3papF' + + 'lH0iO+ElK3LZVcql+QcVt2Ci+hwiKzsUvV5ydnHezyViTYTppjlZzju5SxxddQg+7CwGzX9O8ys1' + + '8dlbMFHD2ruPd4Zig9B1TEKHSdmQQGwITNufbSGixbuOZAbfMXP1oQqSzYkbbx2ye8ddISOr/753' + + 'deLOwaQpy6tK9nb1wYwsfhpmZriWYDcKRfjkgr0srxnC2iDlMB0Do+GCLVVlmju9qcxeObWoxUaX' + + 'TqecRW4fbpa9xAIH0tZlOpIyPgGfm3CXkiGOs/J/QJ4C4spqNpwppoXg6EAig7Y9GStQyEsHXZrj' + + 'vLAefyaseybuMC+9okhx8VYM8esuE2GVTbCbWhn8ZTi8posQ+zabXvk7KE5zwGHDcvSGg0bYctJj' + + 'V6pExLCp1vCUdP3iP06OCFMINDnGR7ZP4Da/atBUuB/F0LN//x+HfwhEUpVTG52L7f6Qjd/LhvU2' + + 'f/zVfMKlw5xXwTjBu2X1oRYfhyYFhgnDECEi9iuRiVwwtnUU39r2XoaGcnMTPnZe62oy2jqTp3p+' + + 'Y+klB9jUwPUg2t5IxptZ0D/H5flD+pEAAAAAYczECM+Nfu0AAfkF4wsAAEsSt/GxxGf7AgAAAAAE' + + 'WVo=') + # Set up a dict of some constants and mappings + self.maps = {} + # Keylist modes + self.maps['keylist'] = {'local': gpg.constants.KEYLIST_MODE_LOCAL, # local keyring + 'remote': gpg.constants.KEYLIST_MODE_EXTERN, # keyserver + # both - this is SUPPOSED to work, but doesn't seem to... it's unreliable at best? + 'both': gpg.constants.KEYLIST_MODE_LOCAL|gpg.constants.KEYLIST_MODE_EXTERN} + # Validity/trust levels + self.maps['trust'] = {-1: ['never', gpg.constants.VALIDITY_NEVER], # this is... probably? not ideal, but. Never trust the key. + 0: ['unknown', gpg.constants.VALIDITY_UNKNOWN], # The key's trust is unknown - typically because it hasn't been set yet. + 1: ['untrusted', gpg.constants.VALIDITY_UNDEFINED], # The key is explicitly set to a blank trust + 2: ['marginal', gpg.constants.VALIDITY_MARGINAL], # Trust a little. + 3: ['full', gpg.constants.VALIDITY_FULL], # This is going to be the default for verified key ownership. + 4: ['ultimate', gpg.constants.VALIDITY_ULTIMATE]} # This should probably only be reserved for keys you directly control. + # Validity/trust reverse mappings - see self.maps['trust'] for the meanings of these + # Used for fetching display/feedback + self.maps['rtrust'] = {gpg.constants.VALIDITY_NEVER: 'Never', + gpg.constants.VALIDITY_UNKNOWN: 'Unknown', + gpg.constants.VALIDITY_UNDEFINED: 'Untrusted', + gpg.constants.VALIDITY_MARGINAL: 'Marginal', + gpg.constants.VALIDITY_FULL: 'Full', + gpg.constants.VALIDITY_ULTIMATE: 'Ultimate'} + # Local signature and other binary (True/False) mappings + self.maps['binmap'] = {0: ['no', False], + 1: ['yes', True]} + # Level of care taken when checking key ownership/valid identity + self.maps['check'] = {0: ['unknown', 0], + 1: ['none', 1], + 2: ['casual', 2], + 3: ['careful', 3]} + # Default protocol/port mappings for keyservers + self.maps['proto'] = {'hkp': [11371, ['tcp', 'udp']], # Standard HKP protocol + 'hkps': [443, ['tcp']], # Yes, same as https + 'http': [80, ['tcp']], # HTTP (plaintext) + 'https': [443, ['tcp']], # SSL/TLS + 'ldap': [389, ['tcp', 'udp']], # Includes TLS negotiation since it runs on the same port + 'ldaps': [636, ['tcp', 'udp']]} # SSL + self.maps['hashalgos'] = {gpg.constants.MD_MD5: 'md5', + gpg.constants.MD_SHA1: 'sha1', + gpg.constants.MD_RMD160: 'ripemd160', + gpg.constants.MD_MD2: 'md2', + gpg.constants.MD_TIGER: 'tiger192', + gpg.constants.MD_HAVAL: 'haval', + gpg.constants.MD_SHA256: 'sha256', + gpg.constants.MD_SHA384: 'sha384', + gpg.constants.MD_SHA512: 'sha512', + gpg.constants.MD_SHA224: 'sha224', + gpg.constants.MD_MD4: 'md4', + gpg.constants.MD_CRC32: 'crc32', + gpg.constants.MD_CRC32_RFC1510: 'crc32rfc1510', + gpg.constants.MD_CRC24_RFC2440: 'crc24rfc2440'} + # Now that all the static data's set up, we can continue. + self.args = self.verifyArgs(args) # Make the args accessible to all functions in the class - see docs/REF.args.struct.txt + # Get the GPGME context + try: + os.environ['GNUPGHOME'] = self.args['gpgdir'] + self.ctx = gpg.Context() + except: + raise RuntimeError('Could not use {0} as a GnuPG home'.format(self.args['gpgdir'])) + self.cfgdir = os.path.join(os.environ['HOME'], '.kant') + if not os.path.isdir(self.cfgdir): + print('No KANT configuration directory found; creating one at {0}...'.format(self.cfgdir)) + os.makedirs(self.cfgdir, exist_ok = True) + self.keys = {} # See docs/REF.keys.struct.txt + self.mykey = {} # "" + self.tpls = {} # Email templates will go here + self.getTpls() # Build out self.tpls + return(None) + + def getEditPrompt(self, key): # "key" should be the FPR of the primary key + # This mapping defines the default "answers" to the gpgme key editing. + # https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py + # https://searchcode.com/codesearch/view/20535820/ + # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS + # You can get the prompt identifiers and status indicators without grokking the source + # by first interactively performing the type of edit(s) you want to do with this command: + # gpg --status-fd 2 --command-fd 2 --edit-key + if key['trust'] >= gpg.constants.VALIDITY_FULL: # For tsigning, it only prompts for two trust levels: + _loctrust = 2 # "I trust fully" + else: + _loctrust = 1 # "I trust marginally" + # TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here. + # "Full" trust is a pretty big thing. + if key['trust'] >= gpg.constants.VALIDITY_FULL: + _locdepth = 2 # Allow +1 level of trust extension + else: + _locdepth = 1 # Only trust this key + _map = {'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list', # Valid commands + 'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref', + 'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save', + 'quit'], + 'prompts': {'edit_ownertrust': {'value': str(key['trust']), # Pulled at time of call + 'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes + 'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted + 'pklist': {'user_id': {'enter': key['pkey']['email']}}, # Prompt for a user ID - can we change this to key ID? + 'sign_uid': {'class': str(key['check']), # The certification/"check" level + 'okay': 'yes'}, # Are you sure that you want to sign this key with your key..." + 'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above + 'trust_depth': str(_locdepth), # The "depth" of the trust signature. + 'trust_regexp': None}, # We can "Restrict" trust to certain domains, but this isn't really necessary. + 'keyedit': {'prompt': 'trust', # Initiate trust editing + 'save': {'okay': 'yes'}}}} # Save if prompted + return(_map) + + def getTpls(self): + for t in ('plain', 'html'): + _tpl_file = os.path.join(self.cfgdir, 'email.{0}.j2'.format(t)) + if os.path.isfile(_tpl_file): + with open(_tpl_file, 'r') as f: + self.tpls[t] = f.read() + else: + self.tpls[t] = lzma.decompress(base64.b64decode(email_tpl[t]), + format = lzma.FORMAT_XZ, + memlimit = None, + filters = None).decode('utf-8') + with open(_tpl_file, 'w') as f: + f.write('{0}'.format(self.tpls[t])) + print('Created: {0}'.format(tpl_file)) + return(self.tpls) + + def modifyDirmngr(self, op): + if not self.args['keyservers']: + return() + _pid = str(os.getpid()) + _activecfg = os.path.join(self.args['gpgdir'], 'dirmngr.conf') + _activegpgconf = os.path.join(self.args['gpgdir'], 'gpg.conf') + _bakcfg = '{0}.{1}'.format(_activecfg, _pid) + _bakgpgconf = '{0}.{1}'.format(_activegpgconf, _pid) + ## Modify files + if op in ('new', 'start', 'replace'): + # Replace the keyservers + if os.path.lexists(_activecfg): + shutil.copy2(_activecfg, _bakcfg) + with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: + for line in read: + if not line.startswith('keyserver '): + write.write(line) + with open(_activecfg, 'a') as f: + for s in self.args['keyservers']: + _uri = '{0}://{1}:{2}'.format(s['proto'], + s['server'], + s['port'][0]) + f.write('keyserver {0}\n'.format(_uri)) + # Use stronger ciphers, etc. and prompt for check/certification levels + if os.path.lexists(_activegpgconf): + shutil.copy2(_activegpgconf, _bakgpgconf) + with open(_activegpgconf, 'w') as f: + f.write('cipher-algo AES256\ndigest-algo SHA512\ncert-digest-algo SHA512\ncompress-algo BZIP2\nask-cert-level\n') + ## Restore files + if op in ('old', 'stop', 'restore'): + # Restore the keyservers + if os.path.lexists(_bakcfg): + with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: + for line in read: + write.write(line) + os.remove(_bakcfg) + else: + os.remove(_activecfg) + # Restore GPG settings + if os.path.lexists(_bakgpgconf): + with open(_bakgpgconf, 'r') as read, open(_activegpgconf, 'w') as write: + for line in read: + write.write(line) + os.remove(_bakgpgconf) + else: + os.remove(_activegpgconf) + subprocess.run(['gpgconf', '--reload', 'dirmngr']) # I *really* wish we could do this via GPGME. + return() def getKeys(self): - # Get our context - os.environ['GNUPGHOME'] = self.args['gpgdir'] - ctx = gpg.Context() - keys = {} + _keyids = [] + _keys = {} # Do we have the key already? If not, fetch. - for k in list(self.args['rcpts']): - if self.args['rcpts'][k]['type'] == 'fpr': - self.keyids.append(k) - if self.args['rcpts'][k]['type'] == 'email': + for r in list(self.args['rcpts'].keys()): + if self.args['rcpts'][r]['type'] == 'fpr': + _keyids.append(r) + self.ctx.set_keylist_mode(self.maps['keylist']['remote']) + try: + _k = self.ctx.get_key(r) + except: + print('{0}: We could not find this key on the keyserver.'.format(r)) # Key not on server + del(self.args['rcpts'][r]) + _keyids.remove(r) + continue + self.ctx.set_keylist_mode(self.maps['keylist']['local']) + _keys[r] = {'fpr': r, + 'obj': _k, + 'created': _k.subkeys[0].timestamp} + if 'T' in str(_keys[r]['created']): + _keys[r]['created'] = int(datetime.datetime.strptime(_keys[r]['created'], + '%Y%m%dT%H%M%S').timestamp()) + if self.args['rcpts'][r]['type'] == 'email': # We need to actually do a lookup on the email address. - with open(os.devnull, 'w') as f: - # TODO: replace with gpg.keylist_mode(gpgme.KEYLIST_MODE_EXTERN) and internal mechanisms? - keyout = subprocess.run(['gpg2', - '--search-keys', - '--with-colons', - '--batch', - k], - stdout = subprocess.PIPE, - stderr = f) - keyout = keyout.stdout.decode('utf-8').splitlines() - for line in keyout: - if line.startswith('pub:'): - key = line.split(':')[1] - keys[key] = {} - keys[key]['uids'] = {} - keys[key]['time'] = int(line.split(':')[4]) - elif line.startswith('uid:'): - uid = re.split('<(.*)>', urllib.parse.unquote(line.split(':')[1].strip())) - uid.remove('') - uid = [u.strip() for u in uid] - keys[key]['uids'][uid[1]] = {} - keys[key]['uids'][uid[1]]['comment'] = uid[0] - keys[key]['uids'][uid[1]]['time'] = int(line.split(':')[2]) - if len(keys) > 1: # Print the keys and prompt for a selection. - print('\nWe found the following keys for <{0}>...\n\nKEY ID:'.format(k)) - for k in keys: - print('{0}\n{1:6}(Generated at {2}) UIDs:'.format(k, '', datetime.datetime.utcfromtimestamp(keys[k]['time']))) - for email in keys[k]['uids']: + _keytmp = [] + for k in self.ctx.keylist(r, mode = self.maps['keylist']['remote']): + _keytmp.append(k) + for k in _keytmp: + _keys[k.fpr] = {'fpr': k.fpr, + 'obj': k, + 'created': k.subkeys[0].timestamp, + 'uids': {}} + # Per the docs (/docs/DETAILS, "*** Field 6 - Creation date"), + # they may change this to ISO 8601... + if 'T' in str(_keys[k.fpr]['created']): + _keys[k.fpr]['created'] = int(datetime.datetime.strptime(_keys[k.fpr]['created'], + '%Y%m%dT%H%M%S').timestamp()) + for s in k.uids: + _keys[k.fpr]['uids'][s.email] = {'comment': s.comment, + 'updated': s.last_update} + if len(_keytmp) > 1: # Print the keys and prompt for a selection. + print('\nWe found the following keys for {0} <{1}>...\n\nKEY ID:'.format(r, _orig_r)) + for s in _keys[r]: + print(s, _keys[r]) + print('{0}\n{1:6}(Generated at {2}) UIDs:'.format(k, + '', + datetime.datetime.utcfromtimestamp(s['updated']))) + for email in _keys[k]['uids']: print('{0:42}(Generated {3}) <{2}> {1}'.format('', - keys[k]['uids'][email]['comment'], + s['uids'][email]['comment'], email, - datetime.datetime.utcfromtimestamp(keys[k]['uids'][email]['time']))) + datetime.datetime.utcfromtimestamp(s['uids'][email]['updated']))) print() while True: key = input('Please enter the (full) appropriate key: ') if key not in keys.keys(): print('Please enter a full key ID from the list above or hit ctrl-d to exit.') else: - self.keyids.append(key) + _keyids.append(key) break else: - if not len(keys.keys()) >= 1: - print('Could not find {0}!'.format(k)) - del(self.args['rcpts'][k]) + if len(_keytmp) == 0: + print('Could not find {0}!'.format(r)) + del(self.args['rcpts'][r]) continue - key = list(keys.keys())[0] - print('\nFound key {0} for {1} (Generated at {2}):'.format(key, k, datetime.datetime.utcfromtimestamp(keys[key]['time']))) - for email in keys[key]['uids']: - print('\t(Generated {2}) {0} <{1}>'.format(keys[key]['uids'][email]['comment'], + _keyids.append(_keys[k.fpr]['fpr']) + print('\nFound key {0} for {1} (Generated at {2}):'.format(_keys[k.fpr]['fpr'], + r, + datetime.datetime.utcfromtimestamp(_keys[k.fpr]['created']))) + for email in _keys[k.fpr]['uids']: + print('\t(Generated {2}) {0} <{1}>'.format(_keys[k.fpr]['uids'][email]['comment'], email, - datetime.datetime.utcfromtimestamp(keys[key]['uids'][email]['time']))) - self.keyids.append(key) + datetime.datetime.utcfromtimestamp(_keys[k.fpr]['uids'][email]['updated']))) print() ## And now we can (FINALLY) fetch the key(s). # TODO: replace with gpg.keylist_mode(gpgme.KEYLIST_MODE_EXTERN) and internal mechanisms? - recvcmd = ['gpg2', '--recv-keys', '--batch', '--yes'] # We'll add the keys onto the end of this next. - recvcmd.extend(self.keyids) - with open(os.devnull, 'w') as f: - fetchout = subprocess.run(recvcmd, stdout = f, stderr = f) # We hide stderr because gpg, for some unknown reason, spits non-errors to stderr. - return(self.keyids) - - def trustKeys(self): - # Map the trust levels to "human" equivalent - trustmap = {-1: ['never', gpgme.VALIDITY_NEVER], # this is... probably? not ideal, but. - 0: ['unknown', gpgme.VALIDITY_UNKNOWN], - 1: ['untrusted', gpgme.VALIDITY_UNDEFINED], - 2: ['marginal', gpgme.VALIDITY_MARGINAL], - 3: ['full', gpgme.VALIDITY_FULL], - 4: ['ultimate', gpgme.VALIDITY_ULTIMATE]} - locmap = {0: ['no', False], - 1: ['yes', True]} - def promptTrust(kinfo): - for k in list(kinfo): - if 'trust' not in kinfo[k].keys(): - trust_lvl = None - trust_in = input(('\nWhat trust level should we assign to {0}?\n\t\t\t\t ({1} <{2}>)' + - '\n\n\t\033[1m-1 = Never\n\t 0 = Unknown\n\t 1 = Untrusted\n\t 2 = Marginal\n\t 3 = Full' + - '\n\t 4 = Ultimate\033[0m\nTrust: ').format(k, kinfo[k]['name'], kinfo[k]['email'])) - for dictk, dictv in trustmap.items(): - if trust_in.lower().strip() == dictv[0]: - trust_lvl = int(dictk) - elif trust_in == str(dictk): - trust_lvl = int(dictk) - if not trust_lvl: - print('Not a valid trust level; skipping. Run kant again to fix.') - continue - kinfo[k]['trust'] = trustmap[trust_lvl][1] - if 'local' not in kinfo[k].keys(): - local = False - if args['keyservers']: - local_in = input('\nShould we push {0} to the keyserver(s) (\033[1mYES\033[0m/No)? '.format(k)) - if local_in.lower().startswith('n'): - local = True - kinfo[k]['local'] = local - return(kinfo) - os.environ['GNUPGHOME'] = args['gpgdir'] - gpg = gpgme.Context() - # Build out some info about keys - kinfo = {} - for k in self.keyids: - if k not in kinfo.keys(): - kinfo[k] = {} - else: - continue # The key was already parsed; don't waste time adding the info + for g in _keyids: try: - kobj = gpg.get_key(k) - kinfo[k]['name'] = kobj.uids[0].name - kinfo[k]['email'] = kobj.uids[0].email - except gpgme.GpgmeError: - print('Can\'t get information about key {0}; skipping.'.format(k)) - del(kinfo[k]) - if not args['batch']: - if not args['trustlevel']: - self.trusts = promptTrust(kinfo) - else: - for k in list(kinfo): - local = False - if 'trust' not in kinfo[k].keys(): - for dictk, dictv in trustmap.items(): - if args['trustlevel'].lower().strip() == dictv[0]: - trust_lvl = int(dictk) - elif args['trustlevel'] == str(dictk): - trust_lvl = int(dictk) - if not trust_lvl: - print('Not a valid trust level; skipping. Run kant again to fix.') - continue - if 'local' not in kinfo[k].keys(): - if args['local']: - local = True - kinfo[k]['local'] = local - kinfo[k]['trust'] = trustmap[trust_lvl][1] - self.trusts = kinfo + self.ctx.op_import_keys([_keys[g]['obj']]) + except gpg.errors.GPGMEError: + print('Key {0} could not be found on the keyserver'.format(g)) # The key isn't on the keyserver + self.ctx.set_keylist_mode(self.maps['keylist']['local']) + for k in _keys: + _key = _keys[k]['obj'] + self.keys[k] = {'pkey': {'email': _key.uids[0].email, + 'name': _key.uids[0].name, + 'creation': datetime.datetime.utcfromtimestamp(_keys[k]['created']), + 'key': _key}, + 'trust': self.args['trustlevel'], # Not set yet; we'll modify this later in buildKeys(). + 'local': self.args['local'], # Not set yet; we'll modify this later in buildKeys(). + 'notify': self.args['notify'], # Same... + 'sign': True, # We don't need to prompt for this since we detect if we need to sign or not + 'change': None, # "" + 'status': None} # Same. + # And we add the subkeys in yet another loop. + self.keys[k]['subkeys'] = {} + self.keys[k]['uids'] = {} + for s in _key.subkeys: + self.keys[k]['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) + for u in _key.uids: + self.keys[k]['uids'][u.email] = {'name': u.name, + 'comment': u.comment, + 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} + del(_keys) + return() + + def buildKeys(self): + self.getKeys() + # Before anything else, let's set up our own key info. + _key = self.ctx.get_key(self.args['sigkey'], secret = True) + self.mykey = {'pkey': {'email': _key.uids[0].email, + 'name': _key.uids[0].name, + 'creation': datetime.datetime.utcfromtimestamp(_key.subkeys[0].timestamp), + 'key': _key}, + 'trust': 'ultimate', # No duh. This is our own key. + 'local': False, # We keep our own key array separate, so we don't push it anyways. + 'notify': False, # "" + 'check': None, # "" + 'change': False, # "" + 'status': None, # "" + 'sign': False} # "" + self.mykey['subkeys'] = {} + self.mykey['uids'] = {} + for s in _key.subkeys: + self.mykey['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) + for u in _key.uids: + self.mykey['uids'][u.email] = {'name': u.name, + 'comment': u.comment, + 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} + # Now let's set up our trusts. + if self.args['batch']: + self.batchParse() else: - self.trusts = {} - csvd = {} # We import the CSV into a totally separate dict so we can do some validation loops - with open(self.args['keys'], 'r') as f: - for row in csv.reader(f, delimiter = ',', quotechar = '"'): - csvd[row[0]] = {'trust': row[1].strip(), - 'local': row[2].strip(), - 'check': row[3].strip(), - 'notify': row[4].strip()} - for k in list(csvd): - if re.match('^?$', k): # is it an email address? - fullkey = gpg.get_key(k) - csvd[fullkey.subkeys[0].fpr] = csvd[k] - del(csvd[k]) - k = fullkey.subkeys[0].fpr - if k not in trusts.keys(): - self.trusts[k] = {} - if 'trust' not in trusts[k].keys(): - # Properly index the trust - strval = str(csvd[k]['trust']).lower().strip() - if strval == 'true': - self.trusts[k]['trust'] = True - elif strval == 'false': - self.trusts[k]['trust'] = False - elif strval == 'none': - self.trusts[k]['trust'] = None - else: - for dictk, dictv in trustmap.items(): - if strval == dictv[0]: - self.trusts[k]['trust'] = trustmap[dictk][1] - elif strval == str(dictk): - self.trusts[k]['trust'] = trustmap[dictk][1] - if 'trust' not in self.trusts[k].keys(): # yes, again. we make sure it was set. otherwise, we need to skip this key. - print('Key {0}: trust level "{1}" is invalid; skipping.'.format(k, csvd[k]['trust'])) - del(self.trusts[k]) - continue - # Now we need to index whether we push or not. - if 'local' not in self.trusts[k].keys(): - strval = str(csvd[k]['local']).lower().strip() - if strval == 'true': - self.trusts[k]['local'] = True - elif strval == 'false': - self.trusts[k]['local'] = False - else: - for dictk, dictv in locmap.items(): - if strval in dictv[0]: - self.trusts[k]['local'] = locmap[dictk][1] - elif strval == str(dictk): - self.trusts[k]['local'] = locmap[dictk][1] - if 'local' not in self.trusts[k].keys(): # yep. double-check - print('Key {0}: local option "{1}" is invalid; skipping.'.format(k, csvd[k]['local'])) - del(self.trusts[k]) - continue - # WHEW. THAT'S A LOT OF VALIDATIONS. Now the Business-End(TM) - # Reverse mapping of constants to human-readable - rmap = {gpgme.VALIDITY_NEVER: 'Never', - gpgme.VALIDITY_UNKNOWN: 'Unknown', - gpgme.VALIDITY_UNDEFINED: 'Untrusted', - gpgme.VALIDITY_MARGINAL: 'Marginal', - gpgme.VALIDITY_FULL: 'Full', - gpgme.VALIDITY_ULTIMATE: 'Ultimate'} - mykey = gpg.get_key(args['sigkey']) - for k in list(self.trusts): - keystat = None - try: - tkey = gpg.get_key(k) - except gpgme.GpgmeError: - print('Cannot find {0} in keyring at all; skipping.'.format(k)) - del(self.trusts[k]) - continue - curtrust = rmap[tkey.owner_trust] - newtrust = rmap[self.trusts[k]['trust']] - if tkey.owner_trust == trusts[k]['trust']: - self.trusts[k]['change'] = False - continue # Don't bother; we aren't changing the trust level, it's the same (OR we haven't trusted yet) - elif tkey.owner_trust == gpgme.VALIDITY_UNKNOWN: - keystat = 'a NEW TRUST' - elif tkey.owner_trust > trusts[k]['trust']: - keystat = 'a DOWNGRADE' - elif tkey.owner_trust < trusts[k]['trust']: - keystat = 'an UPGRADE' - print(('\nKey {0} [{1} ({2})]:\n' + - '\tThis trust level ({3}) is {4} from the current trust level ({5}).').format(k, - kinfo[k]['name'], - kinfo[k]['email'], - newtrust, - keystat, - curtrust)) - tchk = input('Continue? (yes/\033[1mNO\033[0m) ') - if tchk.lower().startswith('y'): - self.trusts[k]['change'] = True - else: - self.trusts[k]['change'] = False - for k in list(self.trusts): - if self.trusts[k]['change']: - print(k) - gpg.editutil.edit_trust(ctx, ctx.get_key(k), self.trusts[k]['trust']) - print() - return(self.trusts) + for k in list(self.keys.keys()): + self.promptTrust(k) + self.promptCheck(k) + self.promptLocal(k) + self.promptNotify(k) + # In case we removed any keys, we have to run this outside of the loops + for k in list(self.keys.keys()): + for t in ('trust', 'local', 'check', 'notify'): + self.keysCleanup(k, t) + # TODO: populate self.keys[key]['change']; we use this for trust (but not sigs) + return() + def batchParse(self): + # First we grab the info from CSV + csvlines = csv.reader(self.csvraw, delimiter = ',', quotechar = '"') + for row in csvlines: + row[0] = row[0].replace('<', '').replace('>', '') + try: + if self.args['rcpts'][row[0]]['type'] == 'fpr': + k = row[0] + else: # It's an email. + key_set = False + while not key_set: + for i in list(self.keys.keys()): + if row[0] in list(self.keys[i]['uids'].keys()): + k = i + key_set = True + self.keys[k]['trust'] = row[1].lower().strip() + self.keys[k]['local'] = row[2].lower().strip() + self.keys[k]['check'] = row[3].lower().strip() + self.keys[k]['notify'] = row[4].lower().strip() + except KeyError: + continue # It was deemed to be an invalid key earlier + return() + + def promptTrust(self, k): + if 'trust' not in self.keys[k].keys() or not self.keys[k]['trust']: + trust_in = input(('\nWhat trust level should we assign to {0}? (The default is '+ + 'Marginal.)\n\t\t\t\t ({1} <{2}>)' + + '\n\n\t\033[1m-1 = Never\n\t 0 = Unknown\n\t 1 = Untrusted\n\t 2 = Marginal\n\t 3 = Full' + + '\n\t 4 = Ultimate\033[0m\nTrust: ').format(k, + self.keys[k]['pkey']['name'], + self.keys[k]['pkey']['email'])) + if trust_in == '': + trust_in = 'marginal' # Has to be a str, so we can "pretend" it was entered + self.keys[k]['trust'] = trust_in + return() + + def promptCheck(self, k): + if 'check' not in self.keys[k].keys() or self.keys[k]['check'] == None: + check_in = input(('\nHow carefully have you checked {0}\'s validity of identity/ownership of the key? ' + + '(Default is Unknown.)\n' + + '\n\t\033[1m0 = Unknown\n\t1 = None\n\t2 = Casual\n\t3 = Careful\033[0m\nCheck level: ').format(k)) + if check_in == '': + check_in == 'unknown' + self.keys[k]['check'] = check_in + return() + + def promptLocal(self, k): + if 'local' not in self.keys[k].keys() or self.keys[k]['local'] == None: + if self.args['keyservers']: + local_in = input(('\nShould we locally sign {0} '+ + '(if yes, the signature will be non-exportable; if no, we will be able to push to a keyserver) ' + + '(Yes/\033[1mNO\033[0m)? ').format(k)) + if local_in == '': + local_in = False + self.keys[k]['local'] = local_in + return() + + def promptNotify(self, k): + if 'notify' not in self.keys[k].keys() or self.keys[k]['notify'] == None: + notify_in = input(('\nShould we notify {0} (via <{1}>) (\033[1mYES\033[0m/No)? ').format(k, + self.keys[k]['pkey']['email'])) + if notify_in == '': + notify_in = True + self.keys[k]['local'] = local_in + return() + + def keysCleanup(self, k, t): # At some point, this WHOLE thing would probably be cleaner with bitwise flags... + s = t + _errs = {'trust': 'trust level', + 'local': 'local signature option', + 'check': 'check level', + 'notify': 'notify flag'} + if k not in self.keys.keys(): + return() # It was deleted already. + if t in ('local', 'notify'): # these use a binary mapping + t = 'binmap' + # We can do some basic stuff right here. + if str(self.keys[k][s]).lower() in ('n', 'no', 'false'): + self.keys[k][s] = False + return() + elif str(self.keys[k][s]).lower() in ('y', 'yes', 'true'): + self.keys[k][s] = True + return() + # Make sure we have a known value. These will ALWAYS be str's, either from the CLI or CSV. + value_in = str(self.keys[k][s]).lower().strip() + for dictk, dictv in self.maps[t].items(): + if value_in == dictv[0]: + self.keys[k][s] = int(dictk) + elif value_in == str(dictk): + self.keys[k][s] = int(dictk) + if not isinstance(self.keys[k][s], int): # It didn't get set + print('{0}: "{1}" is not a valid {2}; skipping. Run kant again to fix.'.format(k, self.keys[k][s], _errs[s])) + del(self.keys[k]) + return() + return() + def sigKeys(self): # The More Business-End(TM) - os.environ['GNUPGHOME'] = args['gpgdir'] - ctx = gpg.Context() - ctx.keylist_mode = gpg.KEYLIST_MODE_SIGS - mkey = ctx.get_key(args['sigkey']) - ctx.signers = [mkey] - global_policy = {} - for k in list(self.trusts): - sign = True - key = ctx.get_key(k) + # NOTE: If the trust level is anything but 2 (the default), we should use op_interact() instead and do a tsign. + self.ctx.keylist_mode = gpg.constants.KEYLIST_MODE_SIGS + _mkey = self.mykey['pkey']['key'] + self.ctx.signers = [_mkey] + for k in list(self.keys.keys()): + key = self.keys[k]['pkey']['key'] for uid in key.uids: for s in uid.signatures: try: signerkey = ctx.get_key(s.keyid).subkeys[0].fpr if signerkey == mkey.subkeys[0].fpr: - sign = False # We already signed this key + self.trusts[k]['sign'] = False # We already signed this key except gpgme.GpgError: pass # usually if we get this it means we don't have a signer's key in our keyring - self.trusts[k]['sign'] = sign - import pprint - pprint.pprint(self.trusts) - # edit_sign(ctx, key, index=0, local=False, norevoke=False, expire=True, check=0) - # index: the index of the user ID to sign, starting at 1. Sign all - # user IDs if set to 0. - # local: make a local signature - # norevoke: make a non-revokable signature - # command: the type of signature. One of sign, lsign, tsign or nrsign. - # expire: whether the signature should expire with the key. - # check: Amount of checking performed. One of: - # 0 - no answer - # 1 - no checking - # 2 - casual checking - # 3 - careful checking - - #gpgme.editutil.edit_sign(gpg, k, index = 0, lo - - - def pushKeys(): # The Last Business-End(TM) - pass - - def modifyDirmngr(self, op): - if not self.args['keyservers']: - return() - pid = str(os.getpid()) - activecfg = os.path.join(self.args['gpgdir'], 'dirmngr.conf') - bakcfg = '{0}.{1}'.format(activecfg, pid) - if op in ('new', 'start'): - if os.path.lexists(activecfg): - shutil.copy2(activecfg, bakcfg) - with open(bakcfg, 'r') as read, open(activecfg, 'w') as write: - for line in read: - if not line.startswith('keyserver '): - write.write(line) - with open(activecfg, 'a') as f: - for s in self.args['keyservers']: - uri = '{0}://{1}:{2}'.format(s['proto'], s['server'], s['port'][0]) - f.write('keyserver {0}\n'.format(uri)) - if op in ('old', 'stop'): - if os.path.lexists(bakcfg): - with open(bakcfg, 'r') as read, open(activecfg, 'w') as write: - for line in read: - write.write(line) - os.remove(bakcfg) - else: - os.remove(activecfg) - subprocess.run(['gpgconf', - '--reload', - 'dirmngr']) + # And again, we loop. ALLLLL that buildup for one line. + for k in list(self.keys.keys()): + # TODO: configure to allow for user-entered expiration? + if self.keys[k]['sign']: + self.ctx.key_sign(self.keys[k]['pkey']['key'], local = self.keys[k]['local']) return() -def serverParser(uri): - # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples - # We need to make a mapping of the default ports. - server = {} - protos = {'hkp': [11371, ['tcp', 'udp']], - 'hkps': [443, ['tcp']], # Yes, same as https - 'http': [80, ['tcp']], - 'https': [443, ['tcp']], # SSL/TLS - 'ldap': [389, ['tcp', 'udp']], # includes TLS negotiation since it runs on the same port - 'ldaps': [636, ['tcp', 'udp']]} # SSL - urlobj = urllib.parse.urlparse(uri) - server['proto'] = urlobj.scheme - lazy = False - if not server['proto']: - server['proto'] = 'hkp' # Default - server['server'] = urlobj.hostname - if not server['server']: - server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri) - lazy = True - server['port'] = urlobj.port - if not server['port']: - if lazy: - p = re.sub('.*:([0-9]+)$', '\g<1>', uri) - server['port'] = protos[server['proto']] # Default - return(server) + class KeyEditor(object): + def __init__(self, optmap): + self.replied_once = False # This is used to handle the first prompt vs. the last + self.optmap = optmap + return(None) + def editKey(self, status, args, out): + _result = None + out.seek(0, 0) + def mapDict(m, d): + return(reduce(operator.getitem, m, d)) + if args == 'keyedit.prompt' and self.replied_once: + _result = 'quit' + elif status == 'KEY_CONSIDERED': + _result = None + self.replied_once = False + elif status == 'GET_LINE': + self.replied_once = True + _ilist = args.split('.') + _result = mapDict(_ilist, self.optmap['prompts']) + if not _result: + _result = None + return(_result) + + def trustKeys(self): # The Son of Business-End(TM) + # TODO: add check for change + for k in list(self.keys.keys()): + _key = self.keys[k] + _map = self.getEditPrompt(_key) + out = gpg.Data() + self.ctx.interact(_key['pkey']['key'], self.KeyEditor(_map).editKey, sink = out, fnc_value = out) + out.seek(0, 0) + return() + + def pushKeys(self): # The Last Business-End(TM) + for k in list(self.keys.keys()): + if not self.keys[k]['local'] and self.keys[k]['sign']: + self.ctx.op_export(k, gpg.constants.EXPORT_MODE_EXTERN, None) + return() + + class Mailer(object): # I lied; The Return of the Business-End(TM) + def __init__(self): + _homeconf = os.path.join(os.environ['HOME'], '.msmtprc') + _sysconf = '/etc/msmtprc' + self.msmtp = {'path': None} + if not os.path.isfile(_homeconf): + if not os.path.isfile(_sysconf): + self.msmtp['conf'] = False + else: + self.msmtp['conf'] = _sysconf + else: + self.msmtp['conf'] = _homeconf + if os.path.isfile(self.msmtp['conf']): + for p in (os.environ['PATH']).split(':'): + if os.path.isfile(os.path.join(p, 'msmtp')): + self.msmtp['path'] = os.path.join(p, 'msmtp') + if self.msmtp['path']: + # Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary. + # Now we need to parse the config. + self.msmtp['cfg'] = self.getCfg() + return(None) + + def getCfg(self): + cfg = {'default': None, 'defaults': {}} + _defaults = False + _acct = None + with open(self.msmtp['conf'], 'r') as f: + _cfg_raw = f.read() + for l in _cfg_raw.splitlines(): + if re.match('^\s?(#.*|)$', l): + continue # Skip over blank and commented lines + _line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)] + if _line[0] == 'account': + if re.match('^default\s?:\s?', _line[1]): # it's the default account specifier + cfg['default'] = _line[1].split(':', maxsplit = 1)[1].strip() + else: + if _line[1] not in cfg.keys(): # it's a new account definition + cfg[_line[1]] = {} + _acct = _line[1] + _defaults = False + elif _line[0] == 'defaults': # it's the defaults + _acct = 'defaults' + else: # it's a config directive + cfg[_acct][_line[0]] = _line[1] + for a in list(cfg): + if a != 'default': + for k, v in cfg['defaults'].items(): + if k not in cfg[a].keys(): + cfg[a][k] = v + del(cfg['defaults']) + return(cfg) + + def sendEmail(self, msg, key, profile): # This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc. + if 'tls-starttls' in self.msmtp['cfg'][profile].keys() and self.msmtp['cfg'][profile]['tls-starttls'] == 'on': + smtpserver = smtplib.SMTP(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.starttls() + # we need to EHLO twice with a STARTTLS because email is weird. + elif self.msmtp['cfg'][profile]['tls'] == 'on': + smtpserver = smtplib.SMTP_SSL(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.login(self.msmtp['cfg'][profile]['user'], self.msmtp['cfg'][profile]['password']) + smtpserver.sendmail(self.msmtp['cfg'][profile]['user'], key['pkey']['email'], msg.as_string()) + smtpserver.close() + return() + + def postalWorker(self): + m = self.Mailer() + if 'KANT' in m.msmtp['cfg'].keys(): + _profile = 'KANT' + else: + _profile = m.msmtp['cfg']['default'] # TODO: let this be specified on the CLI args? + if 'user' not in m.msmtp['cfg'][_profile].keys() or not m.msmtp['cfg'][_profile]['user']: + return() # We don't have MSMTP configured. + # Reconstruct the keyserver list. + _keyservers = [] + for k in self.args['keyservers']: + _keyservers.append('{0}://{1}:{2}'.format(k['proto'], k['server'], k['port'][0])) + # Export our key so we can attach it. + _pubkeys = {} + for e in ('asc', 'gpg'): + if e == 'asc': + self.ctx.armor = True + else: + self.ctx.armor = False + _pubkeys[e] = gpg.Data() # This is a data buffer to store your ASCII-armored pubkeys + self.ctx.op_export_keys([self.mykey['pkey']['key']], 0, _pubkeys[e]) + _pubkeys[e].seek(0, 0) # Read with e.g. _sigs['asc'].read() + for k in list(self.keys.keys()): + if self.keys[k]['notify']: + _body = {} + for t in list(self.tpls.keys()): + # There's gotta be a more efficient way of doing this... + #_tplenv = jinja2.Environment(loader = jinja2.BaseLoader()).from_string(self.tpls[t]) + _tplenv = jinja2.Environment().from_string(self.tpls[t]) + _body[t] = _tplenv.render(key = self.keys[k], + mykey = self.mykey, + keyservers = _keyservers) + b = MIMEMultipart('alternative') # Set up a body + for c in _body.keys(): + b.attach(MIMEText(_body[c], c)) + bmsg = MIMEMultipart() + bmsg.attach(b) + for s in _pubkeys.keys(): + _attchmnt = MIMEApplication(_pubkeys[s].read(), '{0}.{1}'.format(self.mykey['pkey']['key'].fpr, s)) + _attchmnt['Content-Disposition'] = 'attachment; filename="{0}.{1}"'.format(self.mykey['pkey']['key'].fpr, s) + bmsg.attach(_attchmnt) + # Now we sign the body. This incomprehensible bit monkey-formats bmsg to be a multi-RFC-compatible + # string, which is then passed to our gpgme instance's signing mechanishm, and the output of that is + # returned as plaintext. Whew. + self.ctx.armor = True + + _sig = self.ctx.sign((bmsg.as_string().replace('\n', '\r\n')).encode('utf-8'), + mode = gpg.constants.SIG_MODE_DETACH) + imsg = Message() # Build yet another intermediate message... + imsg['Content-Type'] = 'application/pgp-signature; name="signature.asc"' + imsg['Content-Description'] = 'OpenPGP digital signature' + imsg.set_payload(_sig[0].decode('utf-8')) + msg = MIMEMultipart(_subtype = 'signed', + micalg = "pgp-{0}".format(self.maps['hashalgos'][_sig[1].signatures[0].hash_algo]), + protocol = 'application/pgp-signature') + msg.attach(bmsg) # Attach the body (plaintext, html, pubkey attachmants) + msg.attach(imsg) # Attach the isignature + msg['To'] = self.keys[k]['pkey']['email'] + if 'from' in m.msmtp['cfg'][_profile].keys(): + msg['From'] = m.msmtp['cfg'][_profile]['from'] + else: + msg['From'] = self.mykey['pkey']['email'] + msg['Subject'] = 'Your GnuPG/PGP key has been signed' + msg['Openpgp'] = 'id={0}'.format(self.mykey['pkey']['key'].fpr) + msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') + msg['User-Agent'] = 'KANT (part of the r00t^2 OpTools suite: https://git.square-r00t.net/OpTools)' + m.sendEmail(msg, self.keys[k], _profile) # Send the email + for d in (msg, imsg, bmsg, b, _body, _tplenv): # Not necessary, but it pays to be paranoid; we do NOT want leaks. + del(d) + del(m) + return() + + def saveResults(self): + _cachedir = os.path.join(self.cfgdir, 'cache', datetime.datetime.utcnow().strftime('%Y.%m.%d_%H.%M.%S')) + os.makedirs(_cachedir, exist_ok = True) + for k in self.keys.keys(): + _keyout = self.keys[k] + # We need to normalize the datetime objects and gpg objects to strings + _keyout['pkey']['creation'] = str(self.keys[k]['pkey']['creation']) + _keyout['pkey']['key'] = '' + for u in list(_keyout['uids'].keys()): + _keyout['uids'][u]['updated'] = str(self.keys[k]['uids'][u]['updated']) + for s in list(_keyout['subkeys'].keys()): + _keyout['subkeys'][s] = str(self.keys[k]['subkeys'][s]) + _fname = os.path.join(_cachedir, '{0}.json'.format(k)) + with open(_fname, 'a') as f: + f.write('{0}\n'.format(json.dumps(_keyout, sort_keys = True, indent = 4))) + del(_keyout) + # And let's grab a copy of our key in the state that it exists in currently + _mykey = self.mykey + # We need to normalize the datetime objects and gpg objects to strings again + _mykey['pkey']['creation'] = str(_mykey['pkey']['creation']) + _mykey['pkey']['key'] = '' + for u in list(_mykey['uids'].keys()): + _mykey['uids'][u]['updated'] = str(self.mykey['uids'][u]['updated']) + for s in list(_mykey['subkeys'].keys()): + _mykey['subkeys'][s] = str(self.mykey['subkeys'][s]) + with open(os.path.join(_cachedir, '_SIGKEY.json'), 'w') as f: + f.write('{0}\n'.format(json.dumps(_mykey, sort_keys = True, indent = 4))) + return() + + def serverParser(self, uri): + # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples + _server = {} + _urlobj = urllib.parse.urlparse(uri) + _server['proto'] = _urlobj.scheme + _lazy = False + if not _server['proto']: + _server['proto'] = 'hkp' # Default + _server['server'] = _urlobj.hostname + if not _server['server']: + _server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri, re.MULTILINE) + _lazy = True + _server['port'] = _urlobj.port + if not _server['port']: + if _lazy: + _p = re.sub('.*:([0-9]+)$', '\g<1>', uri, re.MULTILINE) + _server['port'] = self.maps['proto'][_server['proto']] # Default + return(_server) + + def verifyArgs(self, locargs): + ## Some pythonization... + if not locargs['batch']: + locargs['keys'] = [re.sub('\s', '', k) for k in locargs['keys'].split(',')] + else: + ## Batch file + _batchfilepath = os.path.abspath(os.path.expanduser(locargs['keys'])) + if not os.path.isfile(_batchfilepath): + raise ValueError('{0} does not exist or is not a regular file.'.format(_batchfilepath)) + else: + with open(_batchfilepath, 'r') as f: + self.csvraw = f.readlines() + locargs['keys'] = _batchfilepath + locargs['keyservers'] = [re.sub('\s', '', s) for s in locargs['keyservers'].split(',')] + locargs['keyservers'] = [self.serverParser(s) for s in locargs['keyservers']] + ## Key(s) to sign + locargs['rcpts'] = {} + if not locargs['batch']: + _keyiter = locargs['keys'] + else: + _keyiter = [] + for row in csv.reader(self.csvraw, delimiter = ',', quotechar = '"'): + _keyiter.append(row[0]) + for k in _keyiter: + locargs['rcpts'][k] = {} + try: + int(k, 16) + _ktype = 'fpr' + except: # If it isn't a valid key ID... + if not re.match('^?$', k): # is it an email address? + raise ValueError('{0} is not a valid email address'.format(k)) + else: + r = k.replace('<', '').replace('>', '') + locargs['rcpts'][r] = locargs['rcpts'][k] + del(locargs['rcpts'][k]) + k = r + _ktype = 'email' + locargs['rcpts'][k]['type'] = _ktype + # Security is important. We don't want users getting collisions, so we don't allow shortened key IDs. + if _ktype == 'fpr' and not len(k) == 40: + raise ValueError('{0} is not a full 40-char key ID or key fingerprint'.format(k)) + ## Signing key + if not locargs['sigkey']: + raise ValueError('A key for signing is required') # We need a key we can sign with. + else: + if not os.path.lexists(locargs['gpgdir']): + raise FileNotFoundError('{0} does not exist'.format(locargs['gpgdir'])) + elif os.path.isfile(locargs['gpgdir']): + raise NotADirectoryError('{0} is not a directory'.format(locargs['gpgdir'])) + # Now we need to verify that the private key exists... + try: + _ctx = gpg.Context() + _sigkey = _ctx.get_key(locargs['sigkey'], True) + except gpg.errors.GPGMEError or gpg.errors.KeyNotFound: + raise ValueError('Cannot use key {0}'.format(locargs['sigkey'])) + # And that it is an eligible candidate to use to sign. + if not _sigkey.can_sign or True in (_sigkey.revoked, _sigkey.expired, _sigkey.disabled): + raise ValueError('{0} is not a valid candidate for signing'.format(locargs['sigkey'])) + ## Keyservers + if locargs['testkeyservers']: + for s in locargs['keyservers']: + # Test to make sure the keyserver is accessible. + _v6test = socket(AF_INET6, SOCK_DGRAM) + try: + _v6test.connect(('ipv6.square-r00t.net', 0)) + _nettype = AF_INET6 # We have IPv6 intarwebz + except: + _nettype = AF_INET # No IPv6, default to IPv4 + for _proto in locargs['keyservers'][s]['port'][1]: + if _proto == 'udp': + _netproto = SOCK_DGRAM + elif _proto == 'tcp': + _netproto = SOCK_STREAM + _sock = socket(nettype, netproto) + _sock.settimeout(10) + _tests = _sock.connect_ex((locargs['keyservers'][s]['server'], + int(locargs['keyservers'][s]['port'][0]))) + _uristr = '{0}://{1}:{2} ({3})'.format(locargs['keyservers'][s]['proto'], + locargs['keyservers'][s]['server'], + locargs['keyservers'][s]['port'][0], + _proto.upper()) + if not tests == 0: + raise OSError('Keyserver {0} is not available'.format(_uristr)) + else: + print('Keyserver {0} is accepting connections.'.format(_uristr)) + sock.close() + return(locargs) + def parseArgs(): def getDefGPGDir(): try: @@ -406,7 +829,7 @@ def parseArgs(): return(None) defkey = None ctx = gpg.Context() - for k in ctx.keylist(None, True): # params are query and secret keyring, respectively + for k in ctx.keylist(None, secret = True): # "None" is query string; this grabs all keys in the private keyring if k.can_sign and True not in (k.revoked, k.expired, k.disabled): defkey = k.subkeys[0].fpr break # We'll just use the first primary key we find that's valid as the default. @@ -462,7 +885,7 @@ def parseArgs(): args.add_argument('-l', '--local', dest = 'local', - default = 'false', + default = None, help = 'Make the signature(s) local-only (i.e. don\'t push to a keyserver).') args.add_argument('-n', '--no-notify', @@ -498,96 +921,22 @@ def parseArgs(): 'set keyserver before anything else. Disabled by default.') return(args) -def verifyArgs(args): - ## Some pythonization... - if not args['batch']: - args['keys'] = [re.sub('\s', '', k) for k in args['keys'].split(',')] - else: - ## Batch file - batchfilepath = os.path.abspath(os.path.expanduser(args['keys'])) - if not os.path.isfile(batchfilepath): - raise ValueError('{0} does not exist or is not a regular file.'.format(batchfilepath)) - else: - args['keys'] = batchfilepath - args['keyservers'] = [re.sub('\s', '', s) for s in args['keyservers'].split(',')] - args['keyservers'] = [serverParser(s) for s in args['keyservers']] - ## Key(s) to sign - args['rcpts'] = {} - if not args['batch']: - keyiter = args['keys'] - else: - keyiter = [] - with open(args['keys'], 'r') as f: - for row in csv.reader(f, delimiter = ',', quotechar = '"'): - keyiter.append(row[0]) - for k in keyiter: - args['rcpts'][k] = {} - try: - int(k, 16) - ktype = 'fpr' - except: # If it isn't a valid key ID... - if not re.match('^?$', k): # is it an email address? - raise ValueError('{0} is not a valid email address'.format(k)) - else: - ktype = 'email' - args['rcpts'][k]['type'] = ktype - if ktype == 'fpr' and not len(k) == 40: # Security is important. We don't want users getting collisions, so we don't allow shortened key IDs. - raise ValueError('{0} is not a full 40-char key ID or key fingerprint'.format(k)) - ## Signing key - if not args['sigkey']: - raise ValueError('A key for signing is required') # We need a key we can sign with. - else: - if not os.path.lexists(args['gpgdir']): - raise FileNotFoundError('{0} does not exist'.format(args['gpgdir'])) - elif os.path.isfile(args['gpgdir']): - raise NotADirectoryError('{0} is not a directory'.format(args['gpgdir'])) - try: - os.environ['GNUPGHOME'] = args['gpgdir'] - ctx = gpg.Context() - except: - raise RuntimeError('Could not use {0} as a GnuPG home'.format(args['gpgdir'])) - # Now we need to verify that the private key exists... - try: - sigkey = ctx.get_key(args['sigkey'], True) - except GpgmeError: - raise ValueError('Cannot use key {0}'.format(args['sigkey'])) - # And that it is an eligible candidate to use to sign. - if not sigkey.can_sign or True in (sigkey.revoked, sigkey.expired, sigkey.disabled): - raise ValueError('{0} is not a valid candidate for signing'.format(args['sigkey'])) - ## Keyservers - if args['testkeyservers']: - for s in args['keyservers']: - # Test to make sure the keyserver is accessible. - v6test = socket(AF_INET6, SOCK_DGRAM) - try: - v6test.connect(('ipv6.square-r00t.net', 0)) - nettype = AF_INET6 # We have IPv6 intarwebz - except: - nettype = AF_INET # No IPv6, default to IPv4 - for proto in s['port'][1]: - if proto == 'udp': - netproto = SOCK_DGRAM - elif proto == 'tcp': - netproto = SOCK_STREAM - sock = socket(nettype, netproto) - sock.settimeout(10) - tests = sock.connect_ex((s['server'], int(s['port'][0]))) - uristr = '{0}://{1}:{2} ({3})'.format(s['proto'], s['server'], s['port'][0], proto.upper()) - if not tests == 0: - raise OSError('Keyserver {0} is not available'.format(uristr)) - else: - print('Keyserver {0} is accepting connections.'.format(uristr)) - sock.close() - return(args) + + + def main(): - rawargs = parseArgs() - args = verifyArgs(vars(rawargs.parse_args())) - sess = sigsession(args) + # This could be cleaner-looking, but we do it this way so the class can be used externally + # with a dict instead of an argparser result. + args = vars(parseArgs().parse_args()) + sess = SigSession(args) sess.modifyDirmngr('new') - sess.getKeys() - sess.trustKeys() + sess.buildKeys() sess.sigKeys() + sess.trustKeys() + sess.pushKeys() + sess.postalWorker() + sess.saveResults() sess.modifyDirmngr('old') if __name__ == '__main__':