From c1d4b5318526861838f5bdf3e4b02e4368f45f32 Mon Sep 17 00:00:00 2001 From: brent s Date: Mon, 28 Aug 2017 10:49:11 -0400 Subject: [PATCH] i think the test cases are done... --- gpg/kant.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/gpg/kant.py b/gpg/kant.py index f890056..f8a8741 100755 --- a/gpg/kant.py +++ b/gpg/kant.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 import argparse +import datetime import email import os import re -from io import BytesIO import subprocess -import datetime +from io import BytesIO +from socket import * +from urllib.parse import urlparse import gpgme # non-stdlib; Arch package is "python-pygpgme" # TODO: @@ -31,6 +33,32 @@ import gpgme # non-stdlib; Arch package is "python-pygpgme" # #Thanks again! +def serverParser(uri): + # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples + # We need to make a mapping of the default ports. + server = {} + protos = {'hkp': [11371, ['tcp', 'udp']], + 'hkps': [443, ['tcp']], # Yes, same as https + 'http': [80, ['tcp']], + 'https': [443, ['tcp']], # SSL/TLS + 'ldap': [389, ['tcp', 'udp']], # includes TLS negotiation since it runs on the same port + 'ldaps': [636, ['tcp', 'udp']]} # SSL + urlobj = urlparse(uri) + server['proto'] = urlobj.scheme + lazy = False + if not server['proto']: + server['proto'] = 'hkp' # Default + server['server'] = urlobj.hostname + if not server['server']: + server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri) + lazy = True + server['port'] = urlobj.port + if not server['port']: + if lazy: + p = re.sub('.*:([0-9]+)$', '\g<1>', uri) + server['port'] = protos[server['proto']] # Default + return(server) + def parseArgs(): def getDefGPGDir(): try: @@ -97,7 +125,21 @@ def parseArgs(): '--keyservers', dest = 'keyservers', default = defkeyservers, - help = 'The comma-separated keyserver(s) to push to. If "None", don\'t push signatures. Default is \033[1m{0}\033[0m.'.format(','.join(defkeyservers))) + help = 'The comma-separated keyserver(s) to push to. If "None", don\'t push signatures. Default is \033[1m{0}\033[0m.'.format( + ','.join( + defkeyservers))) + args.add_argument('-n', + '--netproto', + dest = 'netproto', + action = 'store', + choices = ['4', '6'], + default = '4', + help = 'Whether to use (IPv)4 or (IPv)6. Default is to use IPv4.') + args.add_argument('-t', + '--testkeyservers', + dest = 'testkeyservers', + action = 'store_true', + help = 'If specified, initiate a test connection with each keyserver before anything else. Disabled by default.') return(args) def verifyArgs(args): @@ -107,6 +149,7 @@ def verifyArgs(args): #args['keyservers'] = [s.strip() for s in args['keyservers'].split(',')] args['keys'] = [re.sub('\s', '', k) for k in args['keys'].split(',')] args['keyservers'] = [re.sub('\s', '', s) for s in args['keyservers'].split(',')] + args['keyservers'] = [serverParser(s) for s in args['keyservers']] ## Key(s) to sign args['rcpts'] = {} for k in args['keys']: @@ -145,15 +188,36 @@ def verifyArgs(args): if not sigkey.can_sign or True in (sigkey.revoked, sigkey.expired, sigkey.disabled): raise ValueError('{0} is not a valid candidate for signing'.format(args['sigkey'])) ## Keyservers - # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples - for s in args['keyservers']: - pass + if args['testkeyservers']: + for s in args['keyservers']: + # Test to make sure the keyserver is accessible. + # First we need to construct a way to use python's socket connector + # Great. Now we need to just quickly check to make sure it's accessible - if specified. + if args['netproto'] == '4': + nettype = AF_INET + elif args['netproto'] == '6': + nettype = AF_INET6 + for proto in s['port'][1]: + if proto == 'udp': + netproto = SOCK_DGRAM + elif proto == 'tcp': + netproto = SOCK_STREAM + sock = socket(nettype, netproto) + sock.settimeout(10) + tests = sock.connect_ex((s['server'], int(s['port'][0]))) + uristr = '{0}://{1}:{2} ({3})'.format(s['proto'], s['server'], s['port'][0], proto.upper()) + if not tests == 0: + raise RuntimeError('Keyserver {0} is not available'.format(uristr)) + else: + print('Keyserver {0} is accepting connections.'.format(uristr)) + sock.close() return(args) def main(): rawargs = parseArgs() args = verifyArgs(vars(rawargs.parse_args())) - print(args) + import pprint + pprint.pprint(args) if __name__ == '__main__': main()