From 4da7afdeaf2cf69b624d9eada05bcd939a543312 Mon Sep 17 00:00:00 2001 From: brent s Date: Tue, 19 Sep 2017 05:09:33 -0400 Subject: [PATCH] adding the rewrite... --- gpg/kant/{ => docs}/README | 0 gpg/kant/docs/REF.args.struct.txt | 42 ++ gpg/kant/docs/REF.funcs.struct.txt | 33 + gpg/kant/docs/REF.keys.struct.txt | 44 ++ gpg/kant/{ => docs}/kant.1.adoc | 0 gpg/kant/kant.1 | 225 ------- gpg/kant/kant.new.py | 943 +++++++++++++++++++++++++++++ gpg/kant/kant.py | 5 +- gpg/kant/test.py | 154 ++++- 9 files changed, 1216 insertions(+), 230 deletions(-) rename gpg/kant/{ => docs}/README (100%) create mode 100644 gpg/kant/docs/REF.args.struct.txt create mode 100644 gpg/kant/docs/REF.funcs.struct.txt create mode 100644 gpg/kant/docs/REF.keys.struct.txt rename gpg/kant/{ => docs}/kant.1.adoc (100%) delete mode 100644 gpg/kant/kant.1 create mode 100755 gpg/kant/kant.new.py diff --git a/gpg/kant/README b/gpg/kant/docs/README similarity index 100% rename from gpg/kant/README rename to gpg/kant/docs/README diff --git a/gpg/kant/docs/REF.args.struct.txt b/gpg/kant/docs/REF.args.struct.txt new file mode 100644 index 0000000..f405308 --- /dev/null +++ b/gpg/kant/docs/REF.args.struct.txt @@ -0,0 +1,42 @@ +The __init__() function of kant.SigSession() takes a single argument: args. + +it should be a dict, structured like this: + +{'batch': False, + 'checklevel': None, + 'gpgdir': '/home/bts/.gnupg', + 'keys': 'EFD9413B17293AFDFE6EA6F1402A088DEDF104CB,admin@sysadministrivia.com', + 'keyservers': 'hkp://sks.mirror.square-r00t.net:11371,hkps://hkps.pool.sks-keyservers.net:443,http://pgp.mit.edu:80', + 'local': 'false', + 'notify': True, + 'sigkey': '748231EBCBD808A14F5E85D28C004C2F93481F6B', + 'testkeyservers': False, + 'trustlevel': None} + +The gpgdir, sigkey, and keyservers are set from system defaults in kant.parseArgs() if it's run interactively. +This *may* be reworked in the future to provide a mechanism for external calls to kant.SigSession() but for now, +it's up to you to provide all the data in the dict in the above format. + +It will then internally verify these items and do various conversions, so that self.args becomes this: + +{'batch': False, + 'checklevel': None, + 'gpgdir': '/home/bts/.gnupg', + 'keys': ['EFD9413B17293AFDFE6EA6F1402A088DEDF104CB', + 'admin@sysadministrivia.com'], + 'keyservers': [{'port': [11371, ['tcp', 'udp']], + 'proto': 'hkp', + 'server': 'sks.mirror.square-r00t.net'}, + {'port': [443, ['tcp']], + 'proto': 'hkps', + 'server': 'hkps.pool.sks-keyservers.net'}, + {'port': [80, ['tcp']], + 'proto': 'http', + 'server': 'pgp.mit.edu'}], + 'local': 'false', + 'notify': True, + 'rcpts': {'EFD9413B17293AFDFE6EA6F1402A088DEDF104CB': {'type': 'fpr'}, + 'admin@sysadministrivia.com': {'type': 'email'}}, + 'sigkey': '748231EBCBD808A14F5E85D28C004C2F93481F6B', + 'testkeyservers': False, + 'trustlevel': None} diff --git a/gpg/kant/docs/REF.funcs.struct.txt b/gpg/kant/docs/REF.funcs.struct.txt new file mode 100644 index 0000000..d200fa3 --- /dev/null +++ b/gpg/kant/docs/REF.funcs.struct.txt @@ -0,0 +1,33 @@ +The following functions are available within the SigSession() class: + +getTpls() +Get the user-specified templates if they exist, otherwise set up stock ones. + +modifyDirmngr(op) +*op* can be either: +new/start/replace - modify dirmngr to use the runtime-specified keyserver(s) +old/stop/restore - modify dirmngr back to the keyservers that were defined before modification + +buildKeys() +build out the keys dict (see REF.keys.struct.txt). + +getKeys() +fetch keys in the keys dict (see REF.keys.struct.txt) from a keyserver if they aren't found in the local keyring. + +trustKeys() +set up trusts for the keys in the keys dict (see REF.keys.struct.txt). prompts for each trust not found/specified at runtime. + +sigKeys() +sign keys in the keys dict (see REF.keys.struct.txt), either exportable or local depending on runtime specification. + +pushKeys() +push keys in the keys dict (see REF.keys.struct.txt) to the keyservers specified at runtime (as long as they weren't specified to be local/non-exportable signatures; then we don't bother). + +sendMails() +send emails to each of the recipients specified in the keys dict (see REF.keys.struct.txt). + +serverParser(uri) +returns a dict of a keyserver URI broken up into separate components easier for parsing. + +verifyArgs(locargs) +does some verifications, classifies certain data, calls serverParser(), etc. diff --git a/gpg/kant/docs/REF.keys.struct.txt b/gpg/kant/docs/REF.keys.struct.txt new file mode 100644 index 0000000..7177bc3 --- /dev/null +++ b/gpg/kant/docs/REF.keys.struct.txt @@ -0,0 +1,44 @@ +TYPES: +d = dict +l = list +s = string +i = int +b = binary (True/False) +o = object + +- pkey's dict key is the 40-char key ID of the primary key +- "==>" indicates the next item is a dict and the current item may contain one or more elements of the same format, + "++>" is a list, + "-->" is a "flat" item (string, object, int, etc.) +-"status" is one of "an UPGRADE", "a DOWNGRADE", or "a NEW TRUST". + +keys(d) ==> (40-char key ID)(s) ==> pkey(d) --> email(s) + --> name(s) + --> creation (o, datetime) + --> key(o, gpg) + --> trust(i) + --> check(i) + --> local(b) + --> notify(b) + ==> subkeys(d) ==> (40-char key ID)(s) --> creation + --> change(b) + --> sign(b) + --> status(s) + ==> uids(d) ==> email(s) --> name(s) + --> comment(s) + --> email(s) + --> updated(o, datetime) + +for email templates, they are looped over for each key dict as "key". +so for example, instead of specifying "keys['748231EBCBD808A14F5E85D28C004C2F93481F6B']['pkey']['name']", +you instead should specify "key['pkey']['name']". To get the name of e.g. the second uid, +you'd use "key['uids'][(uid email)]['name']. + +the same structure is available via the "mykey" dictionary. e.g. to get the key ID of *your* key, +you can use "mykey['subkeys'][0][0]". + +you also have the following variables/lists/etc. available for templates (via the Jinja2 templating syntax[0]): +- "keyservers", a list of keyservers set. + + +[0] http://jinja.pocoo.org/docs/2.9/templates/ diff --git a/gpg/kant/kant.1.adoc b/gpg/kant/docs/kant.1.adoc similarity index 100% rename from gpg/kant/kant.1.adoc rename to gpg/kant/docs/kant.1.adoc diff --git a/gpg/kant/kant.1 b/gpg/kant/kant.1 deleted file mode 100644 index 3cb714f..0000000 --- a/gpg/kant/kant.1 +++ /dev/null @@ -1,225 +0,0 @@ -'\" t -.\" Title: kant -.\" Author: Brent Saner -.\" Generator: Asciidoctor 1.5.5 -.\" Date: 2017-09-07 -.\" Manual: KANT - Keysigning and Notification Tool -.\" Source: KANT -.\" Language: English -.\" -.TH "KANT" "1" "2017-09-07" "KANT" "KANT \- Keysigning and Notification Tool" -.ie \n(.g .ds Aq \(aq -.el .ds Aq ' -.ss \n[.ss] 0 -.nh -.ad l -.de URL -\\$2 \(laURL: \\$1 \(ra\\$3 -.. -.if \n[.g] .mso www.tmac -.LINKSTYLE blue R < > -.SH "NAME" -kant \- Sign GnuPG/OpenPGP/PGP keys and notify the key owner(s) -.SH "SYNOPSIS" -.sp -\fBkant\fP [\fIOPTION\fP] \-k/\-\-key \fI\fP -.SH "OPTIONS" -.sp -Keysigning (and keysigning parties) can be a lot of fun, and can offer someone with new keys a way into the WoT (Web\-of\-Trust). -Unfortunately, they can be intimidating to those new to the experience. -This tool offers a simple and easy\-to\-use interface to sign public keys (normal, local\-only, and/or non\-exportable), -set owner trust, specify level of checking done, and push the signatures to a keyserver. It even supports batch operation via a CSV file. -.sp -\fB\-h\fP, \fB\-\-help\fP -.RS 4 -Display brief help/usage and exit. -.RE -.sp -\fB\-k\fP \fIKEY_IDS|BATCHFILE\fP, \fB\-\-key\fP \fIKEY_IDS|BATCHFILE\fP -.RS 4 -A single or comma\-separated list of key IDs (see \fBKEY ID FORMAT\fP) to sign, trust, and notify. Can also be an email address. -If \fB\-b\fP/\fB\-\-batch\fP is specified, this should instead be a path to the batch file (see \fBBATCHFILE/Format\fP). -.RE -.sp -\fB\-K\fP \fIKEY_ID\fP, \fB\-\-sigkey\fP \fIKEY_ID\fP -.RS 4 -The key to use when signing other keys (see \fBKEY ID FORMAT\fP). The default key is automatically determined at runtime -(it will be displayed in \fB\-h\fP/\fB\-\-help\fP output). -.RE -.sp -\fB\-t\fP \fITRUSTLEVEL\fP, \fB\-\-trust\fP \fITRUSTLEVEL\fP -.RS 4 -The trust level to automatically apply to all keys (if not specified, KANT will prompt for each key). -See \fBBATCHFILE/TRUSTLEVEL\fP for trust level notations. -.RE -.sp -\fB\-c\fP \fICHECKLEVEL\fP, \fB\-\-check\fP \fICHECKLEVEL\fP -.RS 4 -The level of checking that was done to confirm the validity of ownership for all keys being signed. If not specified, -the default is for KANT to prompt for each key we sign. See \fBBATCHFILE/CHECKLEVEL\fP for check level notations. -.RE -.sp -\fB\-l\fP \fILOCAL\fP, \fB\-\-local\fP \fILOCAL\fP -.RS 4 -If specified, make the signature(s) local\-only (i.e. non\-exportable, don\(cqt push to a keyserver). -See \fBBATCHFILE/LOCAL\fP for more information on local signatures. -.RE -.sp -\fB\-n\fP, \fB\-\-no\-notify\fP -.RS 4 -This requires some explanation. If you have MSMTP[1] installed and configured for the currently active user, -then we will send out emails to recipients letting them know we have signed their key. However, if MSMTP is installed and configured -but this flag is given, then we will NOT attempt to send emails. -.RE -.sp -\fB\-s\fP \fIKEYSERVER(S)\fP, \fB\-\-keyservers\fP \fIKEYSERVER(S)\fP -.RS 4 -The comma\-separated keyserver(s) to push to. The default keyserver list is automatically generated at runtime. -.RE -.sp -\fB\-b\fP, \fB\-\-batch\fP -.RS 4 -If specified, operate in batch mode. See \fBBATCHFILE\fP for more information. -.RE -.sp -\fB\-D\fP \fIGPGDIR\fP, \fB\-\-gpgdir\fP \fIGPGDIR\fP -.RS 4 -The GnuPG configuration directory to use (containing your keys, etc.). The default is automatically generated at runtime, -but will probably be \fB/home//.gnupg\fP or similar. -.RE -.sp -\fB\-T\fP, \fB\-\-testkeyservers\fP -.RS 4 -If specified, initiate a basic test connection with each set keyserver before anything else. Disabled by default. -.RE -.SH "KEY ID FORMAT" -.sp -Key IDs can be specified in one of two ways. The first (and preferred) way is to use the full 160\-bit (40\-character, hexadecimal) key ID. -A little known fact is the fingerprint of a key: -.sp -\fBDEAD BEEF DEAD BEEF DEAD BEEF DEAD BEEF DEAD BEEF\fP -.sp -is actually the full key ID of the primary key; i.e.: -.sp -\fBDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF\fP -.sp -The second way to specify a key, as far as KANT is concerned, is to use an email address. -Do note that if more than one key is found that matches the email address given (and they usually are), you will be prompted to select the specific -correct key ID anyways so it\(cqs usually a better idea to have the owner present their full key ID/fingerprint right from the get\-go. -.SH "BATCHFILE" -.SS "Format" -.sp -The batch file is a CSV\-formatted (comma\-delimited) file containing keys to sign and other information about them. It keeps the following format: -.sp -\fBKEY_ID,TRUSTLEVEL,LOCAL,CHECKLEVEL,NOTIFY\fP -.sp -For more information on each column, reference the appropriate sub\-section below. -.SS "KEY_ID" -.sp -See \fBKEY ID FORMAT\fP. -.SS "TRUSTLEVEL" -.sp -The \fITRUSTLEVEL\fP is specified by the following levels (you can use either the numeric or string representation): -.sp -.if n \{\ -.RS 4 -.\} -.nf -\fB\-1 = Never - 0 = Unknown - 1 = Untrusted - 2 = Marginal - 3 = Full - 4 = Ultimate\fP -.fi -.if n \{\ -.RE -.\} -.sp -It is how much trust to assign to a key, and the signatures that key makes on other keys.[2] -.SS "LOCAL" -.sp -Whether or not to push to a keyserver. It can be either the numeric or string representation of the following: -.sp -.if n \{\ -.RS 4 -.\} -.nf -\fB0 = False -1 = True\fP -.fi -.if n \{\ -.RE -.\} -.sp -If \fB1/True\fP, KANT will sign the key with a local signature (and the signature will not be pushed to a keyserver or be exportable).[3] -.SS "CHECKLEVEL" -.sp -The amount of checking that has been done to confirm that the owner of the key is who they say they are and that the key matches their provided information. -It can be either the numeric or string representation of the following: -.sp -.if n \{\ -.RS 4 -.\} -.nf -\fB0 = Unknown -1 = None -2 = Casual -3 = Careful\fP -.fi -.if n \{\ -.RE -.\} -.sp -It is up to you to determine the classification of the amount of checking you have done, but the following is recommended (it is the policy -the author follows): -.sp -.if n \{\ -.RS 4 -.\} -.nf -\fBUnknown:\fP The key is unknown and has not been reviewed - -\fBNone:\fP The key has been signed, but no confirmation of the - ownership of the key has been performed (typically - a local signature) - -\fBCasual:\fP The key has been presented and the owner is either - known to the signer or they have provided some form - of non\-government\-issued identification or other - proof (website, Keybase.io, etc.) - -\fBCareful:\fP The same as \fBCasual\fP requirements but they have - provided a government\-issued ID and all information - matches -.fi -.if n \{\ -.RE -.\} -.sp -It\(cqs important to check each key you sign carefully. Failure to do so may hurt others\(aq trust in your key.[4] -.SH "SEE ALSO" -.sp -gpg(1), gpgconf(1) -.SH "RESOURCES" -.sp -\fBAuthor\(cqs web site:\fP \c -.URL "https://square\-r00t.net/" "" "" -\fBAuthor\(cqs GPG information:\fP \c -.URL "https://square\-r00t.net/gpg\-info" "" "" -.SH "COPYING" -.sp -Copyright (C) 2017 Brent Saner. -.sp -Free use of this software is granted under the terms of the GPLv3 License. -.SH "NOTES" -1. http://msmtp.sourceforge.net/ -2. For more information on trust levels and the Web of Trust, see: https://www.gnupg.org/gph/en/manual/x334.html and https://www.gnupg.org/gph/en/manual/x547.html -3. For more information on pushing to keyservers and local signatures, see: https://www.gnupg.org/gph/en/manual/r899.html#LSIGN and https://lists.gnupg.org/pipermail/gnupg-users/2007-January/030242.html -4. GnuPG documentation refers to this as "validity"; see https://www.gnupg.org/gph/en/manual/x334.html -.SH "AUTHOR(S)" -.sp -\fBBrent Saner\fP -.RS 4 -Author(s). -.RE diff --git a/gpg/kant/kant.new.py b/gpg/kant/kant.new.py new file mode 100755 index 0000000..26ffa72 --- /dev/null +++ b/gpg/kant/kant.new.py @@ -0,0 +1,943 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import csv +import datetime +import json +import lzma +import operator +import os +import re +import shutil +import smtplib +import subprocess +from email.message import Message +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from functools import reduce +from io import BytesIO +from socket import * +import urllib.parse +import jinja2 # non-stdlib; Arch package is python-jinja2 +import gpg # non-stdlib; Arch package is "python-gpgme" - see: +import gpg.constants # https://git.archlinux.org/svntogit/packages.git/tree/trunk/PKGBUILD?h=packages/gpgme and +import gpg.errors # https://gnupg.org/ftp/gcrypt/gpgme/ (incl. python bindings in build) +import pprint # development debug + + +class SigSession(object): # see docs/REFS.funcs.struct.txt + def __init__(self, args): + # These are the "stock" templates for emails. It's a PITA, but to save some space since we store them + # inline in here, they're XZ'd and base64'd. + self.email_tpl = {} + self.email_tpl['plain'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AWYAs1dACQZSZhvFgKNdKNXbSf05z0ZPvTvmdQ0mJQg' + + 'atgzhPVeLKxz22bhxedC813X5I8Gn2g9q9Do2jPPViyuLf1SFHDUx1m7SEFsSUyT7L/j71tuxRZi' + + 'xLyy6P3mHo3HeUhwgpgh6lvMYwlTf+zhj3558RJmhXprLteoKt/sY4NIMzz3TBa0ivsVo6EFA9/G' + + '2q1MpuHxg86uY1pA4tkFlmxuSklZq5EKuu7B5RSeUGB+SsjDPSfsdhoPngMET1EXTZfVSWezjJkH' + + 'REyn5SgqpD7vwmwvZWcwWua+V+e/rYYF1cx0Y0Wi1wAC6NzGDce3gbcr/k6M/PiMi8ekJPCmgMgP' + + 'R4GBtRi121wU374nml42WjdGjHee6Se6d0OGKscJjB5Z1eSOho63OMn5Ayu4Lvl05L/mB88Hnk5e' + + 'MayYmXqQdhx3ualWiD/TdHwLf+79t43wfjs6Z/aq/lku67SGdUpjuYRV52nls6WPTuBmo2oCbzpP' + + 'MojIXiYHR3cWebI2CdnVTTHHz7el4NAWIlPKtZkPR6VYj2DkND4kmkO92I258hUqLARWnNaywx87' + + 'hFzhaGN9oKZdozKSZpEDyZUsymWuhQnnY76EImeha67LJwSsXLpBxuViCNlqv7ATT9iffzDmjm0i' + + '2MeLix3rBRMWp4MmWC8bP1ZAKOEq4M3hwjt1q68fH4QvtmTyic/7DBW2KsgZRu21RjK7tHtKTxwy' + + '2UN3pGT6uZRL8vNRNvD70UaeD5MW7lFBPehIeFoByaEKGFfeS8dKc1VFauDmkRlhOboLkPqqwbV+' + + 'tUmU8UvTftKIx1RwTm3FKqjKCYrdqp0fL5wsA93YhRJqHfkvtjCjzRc0czszURkJMHfPyttQg2jb' + + 'UQVIg40XMgew2EUdCrJC3wTvXm9tBxMqXAFBn6S4ihqiJ5PTUDKCx7EnAjK3kUwbWvDno8h1M9u9' + + 'teC5CEWhcAAAAAAwG5UqO9bdswAB6QWZCwAAKFDj9rHEZ/sCAAAAAARZWg==') + self.email_tpl['html'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AXiAt1dAB4aCobvStaHNqdVBn1LjZcL+G+98rmZ7eGZ' + + 'Wqx+3LjENIv37L1aGPICZDRBrsBugzVSasRBHkdHMrWW7SsPfRzw6btQfASTHLr48auPJlJgXTnb' + + 'vgDd2ELrs6p5kXZwE7+pedhgffAcekwr0eyqVNzzdUWJpvZcDBGtp+yvIwSAcrKkUxUd5AFBigd3' + + '4IW1XEK3eQ+LSSEIquUugrd3xiFB4rkSSDGbAuVLqy4Sq3w5c8RRAKavhfSn154/H/D+3RhqFHc/' + + '/x51rgXFvgJ/DwYrr9g7JV9EQB15JvJJxazgnWftZE0W0u05Z7QNrIZQMG6LjcSjf0ep1zNYrJkf' + + 'UYOfHNjXGfpmIfG0Y/cNhU1Zqv3ohv6EGWk4B7CdLjXEeDYqkJgIGA6Q4FQXM6PF7blXFQJ3papF' + + 'lH0iO+ElK3LZVcql+QcVt2Ci+hwiKzsUvV5ydnHezyViTYTppjlZzju5SxxddQg+7CwGzX9O8ys1' + + '8dlbMFHD2ruPd4Zig9B1TEKHSdmQQGwITNufbSGixbuOZAbfMXP1oQqSzYkbbx2ye8ddISOr/753' + + 'deLOwaQpy6tK9nb1wYwsfhpmZriWYDcKRfjkgr0srxnC2iDlMB0Do+GCLVVlmju9qcxeObWoxUaX' + + 'TqecRW4fbpa9xAIH0tZlOpIyPgGfm3CXkiGOs/J/QJ4C4spqNpwppoXg6EAig7Y9GStQyEsHXZrj' + + 'vLAefyaseybuMC+9okhx8VYM8esuE2GVTbCbWhn8ZTi8posQ+zabXvk7KE5zwGHDcvSGg0bYctJj' + + 'V6pExLCp1vCUdP3iP06OCFMINDnGR7ZP4Da/atBUuB/F0LN//x+HfwhEUpVTG52L7f6Qjd/LhvU2' + + 'f/zVfMKlw5xXwTjBu2X1oRYfhyYFhgnDECEi9iuRiVwwtnUU39r2XoaGcnMTPnZe62oy2jqTp3p+' + + 'Y+klB9jUwPUg2t5IxptZ0D/H5flD+pEAAAAAYczECM+Nfu0AAfkF4wsAAEsSt/GxxGf7AgAAAAAE' + + 'WVo=') + # Set up a dict of some constants and mappings + self.maps = {} + # Keylist modes + self.maps['keylist'] = {'local': gpg.constants.KEYLIST_MODE_LOCAL, # local keyring + 'remote': gpg.constants.KEYLIST_MODE_EXTERN, # keyserver + # both - this is SUPPOSED to work, but doesn't seem to... it's unreliable at best? + 'both': gpg.constants.KEYLIST_MODE_LOCAL|gpg.constants.KEYLIST_MODE_EXTERN} + # Validity/trust levels + self.maps['trust'] = {-1: ['never', gpg.constants.VALIDITY_NEVER], # this is... probably? not ideal, but. Never trust the key. + 0: ['unknown', gpg.constants.VALIDITY_UNKNOWN], # The key's trust is unknown - typically because it hasn't been set yet. + 1: ['untrusted', gpg.constants.VALIDITY_UNDEFINED], # The key is explicitly set to a blank trust + 2: ['marginal', gpg.constants.VALIDITY_MARGINAL], # Trust a little. + 3: ['full', gpg.constants.VALIDITY_FULL], # This is going to be the default for verified key ownership. + 4: ['ultimate', gpg.constants.VALIDITY_ULTIMATE]} # This should probably only be reserved for keys you directly control. + # Validity/trust reverse mappings - see self.maps['trust'] for the meanings of these + # Used for fetching display/feedback + self.maps['rtrust'] = {gpg.constants.VALIDITY_NEVER: 'Never', + gpg.constants.VALIDITY_UNKNOWN: 'Unknown', + gpg.constants.VALIDITY_UNDEFINED: 'Untrusted', + gpg.constants.VALIDITY_MARGINAL: 'Marginal', + gpg.constants.VALIDITY_FULL: 'Full', + gpg.constants.VALIDITY_ULTIMATE: 'Ultimate'} + # Local signature and other binary (True/False) mappings + self.maps['binmap'] = {0: ['no', False], + 1: ['yes', True]} + # Level of care taken when checking key ownership/valid identity + self.maps['check'] = {0: ['unknown', 0], + 1: ['none', 1], + 2: ['casual', 2], + 3: ['careful', 3]} + # Default protocol/port mappings for keyservers + self.maps['proto'] = {'hkp': [11371, ['tcp', 'udp']], # Standard HKP protocol + 'hkps': [443, ['tcp']], # Yes, same as https + 'http': [80, ['tcp']], # HTTP (plaintext) + 'https': [443, ['tcp']], # SSL/TLS + 'ldap': [389, ['tcp', 'udp']], # Includes TLS negotiation since it runs on the same port + 'ldaps': [636, ['tcp', 'udp']]} # SSL + self.maps['hashalgos'] = {gpg.constants.MD_MD5: 'md5', + gpg.constants.MD_SHA1: 'sha1', + gpg.constants.MD_RMD160: 'ripemd160', + gpg.constants.MD_MD2: 'md2', + gpg.constants.MD_TIGER: 'tiger192', + gpg.constants.MD_HAVAL: 'haval', + gpg.constants.MD_SHA256: 'sha256', + gpg.constants.MD_SHA384: 'sha384', + gpg.constants.MD_SHA512: 'sha512', + gpg.constants.MD_SHA224: 'sha224', + gpg.constants.MD_MD4: 'md4', + gpg.constants.MD_CRC32: 'crc32', + gpg.constants.MD_CRC32_RFC1510: 'crc32rfc1510', + gpg.constants.MD_CRC24_RFC2440: 'crc24rfc2440'} + # Now that all the static data's set up, we can continue. + self.args = self.verifyArgs(args) # Make the args accessible to all functions in the class - see docs/REF.args.struct.txt + # Get the GPGME context + try: + os.environ['GNUPGHOME'] = self.args['gpgdir'] + self.ctx = gpg.Context() + except: + raise RuntimeError('Could not use {0} as a GnuPG home'.format(self.args['gpgdir'])) + self.cfgdir = os.path.join(os.environ['HOME'], '.kant') + if not os.path.isdir(self.cfgdir): + print('No KANT configuration directory found; creating one at {0}...'.format(self.cfgdir)) + os.makedirs(self.cfgdir, exist_ok = True) + self.keys = {} # See docs/REF.keys.struct.txt + self.mykey = {} # "" + self.tpls = {} # Email templates will go here + self.getTpls() # Build out self.tpls + return(None) + + def getEditPrompt(self, key): # "key" should be the FPR of the primary key + # This mapping defines the default "answers" to the gpgme key editing. + # https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py + # https://searchcode.com/codesearch/view/20535820/ + # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS + # You can get the prompt identifiers and status indicators without grokking the source + # by first interactively performing the type of edit(s) you want to do with this command: + # gpg --status-fd 2 --command-fd 2 --edit-key + if key['trust'] >= gpg.constants.VALIDITY_FULL: # For tsigning, it only prompts for two trust levels: + _loctrust = 2 # "I trust fully" + else: + _loctrust = 1 # "I trust marginally" + # TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here. + # "Full" trust is a pretty big thing. + if key['trust'] >= gpg.constants.VALIDITY_FULL: + _locdepth = 2 # Allow +1 level of trust extension + else: + _locdepth = 1 # Only trust this key + _map = {'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list', # Valid commands + 'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref', + 'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save', + 'quit'], + 'prompts': {'edit_ownertrust': {'value': str(key['trust']), # Pulled at time of call + 'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes + 'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted + 'pklist': {'user_id': {'enter': key['pkey']['email']}}, # Prompt for a user ID - can we change this to key ID? + 'sign_uid': {'class': str(key['check']), # The certification/"check" level + 'okay': 'yes'}, # Are you sure that you want to sign this key with your key..." + 'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above + 'trust_depth': str(_locdepth), # The "depth" of the trust signature. + 'trust_regexp': None}, # We can "Restrict" trust to certain domains, but this isn't really necessary. + 'keyedit': {'prompt': 'trust', # Initiate trust editing + 'save': {'okay': 'yes'}}}} # Save if prompted + return(_map) + + def getTpls(self): + for t in ('plain', 'html'): + _tpl_file = os.path.join(self.cfgdir, 'email.{0}.j2'.format(t)) + if os.path.isfile(_tpl_file): + with open(_tpl_file, 'r') as f: + self.tpls[t] = f.read() + else: + self.tpls[t] = lzma.decompress(base64.b64decode(email_tpl[t]), + format = lzma.FORMAT_XZ, + memlimit = None, + filters = None).decode('utf-8') + with open(_tpl_file, 'w') as f: + f.write('{0}'.format(self.tpls[t])) + print('Created: {0}'.format(tpl_file)) + return(self.tpls) + + def modifyDirmngr(self, op): + if not self.args['keyservers']: + return() + _pid = str(os.getpid()) + _activecfg = os.path.join(self.args['gpgdir'], 'dirmngr.conf') + _activegpgconf = os.path.join(self.args['gpgdir'], 'gpg.conf') + _bakcfg = '{0}.{1}'.format(_activecfg, _pid) + _bakgpgconf = '{0}.{1}'.format(_activegpgconf, _pid) + ## Modify files + if op in ('new', 'start', 'replace'): + # Replace the keyservers + if os.path.lexists(_activecfg): + shutil.copy2(_activecfg, _bakcfg) + with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: + for line in read: + if not line.startswith('keyserver '): + write.write(line) + with open(_activecfg, 'a') as f: + for s in self.args['keyservers']: + _uri = '{0}://{1}:{2}'.format(s['proto'], + s['server'], + s['port'][0]) + f.write('keyserver {0}\n'.format(_uri)) + # Use stronger ciphers, etc. and prompt for check/certification levels + if os.path.lexists(_activegpgconf): + shutil.copy2(_activegpgconf, _bakgpgconf) + with open(_activegpgconf, 'w') as f: + f.write('cipher-algo AES256\ndigest-algo SHA512\ncert-digest-algo SHA512\ncompress-algo BZIP2\nask-cert-level\n') + ## Restore files + if op in ('old', 'stop', 'restore'): + # Restore the keyservers + if os.path.lexists(_bakcfg): + with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write: + for line in read: + write.write(line) + os.remove(_bakcfg) + else: + os.remove(_activecfg) + # Restore GPG settings + if os.path.lexists(_bakgpgconf): + with open(_bakgpgconf, 'r') as read, open(_activegpgconf, 'w') as write: + for line in read: + write.write(line) + os.remove(_bakgpgconf) + else: + os.remove(_activegpgconf) + subprocess.run(['gpgconf', '--reload', 'dirmngr']) # I *really* wish we could do this via GPGME. + return() + + def getKeys(self): + _keyids = [] + _keys = {} + # Do we have the key already? If not, fetch. + for r in list(self.args['rcpts'].keys()): + if self.args['rcpts'][r]['type'] == 'fpr': + _keyids.append(r) + self.ctx.set_keylist_mode(self.maps['keylist']['remote']) + try: + _k = self.ctx.get_key(r) + except: + print('{0}: We could not find this key on the keyserver.'.format(r)) # Key not on server + del(self.args['rcpts'][r]) + _keyids.remove(r) + continue + self.ctx.set_keylist_mode(self.maps['keylist']['local']) + _keys[r] = {'fpr': r, + 'obj': _k, + 'created': _k.subkeys[0].timestamp} + if 'T' in str(_keys[r]['created']): + _keys[r]['created'] = int(datetime.datetime.strptime(_keys[r]['created'], + '%Y%m%dT%H%M%S').timestamp()) + if self.args['rcpts'][r]['type'] == 'email': + # We need to actually do a lookup on the email address. + _keytmp = [] + for k in self.ctx.keylist(r, mode = self.maps['keylist']['remote']): + _keytmp.append(k) + for k in _keytmp: + _keys[k.fpr] = {'fpr': k.fpr, + 'obj': k, + 'created': k.subkeys[0].timestamp, + 'uids': {}} + # Per the docs (/docs/DETAILS, "*** Field 6 - Creation date"), + # they may change this to ISO 8601... + if 'T' in str(_keys[k.fpr]['created']): + _keys[k.fpr]['created'] = int(datetime.datetime.strptime(_keys[k.fpr]['created'], + '%Y%m%dT%H%M%S').timestamp()) + for s in k.uids: + _keys[k.fpr]['uids'][s.email] = {'comment': s.comment, + 'updated': s.last_update} + if len(_keytmp) > 1: # Print the keys and prompt for a selection. + print('\nWe found the following keys for {0} <{1}>...\n\nKEY ID:'.format(r, _orig_r)) + for s in _keys[r]: + print(s, _keys[r]) + print('{0}\n{1:6}(Generated at {2}) UIDs:'.format(k, + '', + datetime.datetime.utcfromtimestamp(s['updated']))) + for email in _keys[k]['uids']: + print('{0:42}(Generated {3}) <{2}> {1}'.format('', + s['uids'][email]['comment'], + email, + datetime.datetime.utcfromtimestamp(s['uids'][email]['updated']))) + print() + while True: + key = input('Please enter the (full) appropriate key: ') + if key not in keys.keys(): + print('Please enter a full key ID from the list above or hit ctrl-d to exit.') + else: + _keyids.append(key) + break + else: + if len(_keytmp) == 0: + print('Could not find {0}!'.format(r)) + del(self.args['rcpts'][r]) + continue + _keyids.append(_keys[k.fpr]['fpr']) + print('\nFound key {0} for {1} (Generated at {2}):'.format(_keys[k.fpr]['fpr'], + r, + datetime.datetime.utcfromtimestamp(_keys[k.fpr]['created']))) + for email in _keys[k.fpr]['uids']: + print('\t(Generated {2}) {0} <{1}>'.format(_keys[k.fpr]['uids'][email]['comment'], + email, + datetime.datetime.utcfromtimestamp(_keys[k.fpr]['uids'][email]['updated']))) + print() + ## And now we can (FINALLY) fetch the key(s). + # TODO: replace with gpg.keylist_mode(gpgme.KEYLIST_MODE_EXTERN) and internal mechanisms? + for g in _keyids: + try: + self.ctx.op_import_keys([_keys[g]['obj']]) + except gpg.errors.GPGMEError: + print('Key {0} could not be found on the keyserver'.format(g)) # The key isn't on the keyserver + self.ctx.set_keylist_mode(self.maps['keylist']['local']) + for k in _keys: + _key = _keys[k]['obj'] + self.keys[k] = {'pkey': {'email': _key.uids[0].email, + 'name': _key.uids[0].name, + 'creation': datetime.datetime.utcfromtimestamp(_keys[k]['created']), + 'key': _key}, + 'trust': self.args['trustlevel'], # Not set yet; we'll modify this later in buildKeys(). + 'local': self.args['local'], # Not set yet; we'll modify this later in buildKeys(). + 'notify': self.args['notify'], # Same... + 'sign': True, # We don't need to prompt for this since we detect if we need to sign or not + 'change': None, # "" + 'status': None} # Same. + # And we add the subkeys in yet another loop. + self.keys[k]['subkeys'] = {} + self.keys[k]['uids'] = {} + for s in _key.subkeys: + self.keys[k]['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) + for u in _key.uids: + self.keys[k]['uids'][u.email] = {'name': u.name, + 'comment': u.comment, + 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} + del(_keys) + return() + + def buildKeys(self): + self.getKeys() + # Before anything else, let's set up our own key info. + _key = self.ctx.get_key(self.args['sigkey'], secret = True) + self.mykey = {'pkey': {'email': _key.uids[0].email, + 'name': _key.uids[0].name, + 'creation': datetime.datetime.utcfromtimestamp(_key.subkeys[0].timestamp), + 'key': _key}, + 'trust': 'ultimate', # No duh. This is our own key. + 'local': False, # We keep our own key array separate, so we don't push it anyways. + 'notify': False, # "" + 'check': None, # "" + 'change': False, # "" + 'status': None, # "" + 'sign': False} # "" + self.mykey['subkeys'] = {} + self.mykey['uids'] = {} + for s in _key.subkeys: + self.mykey['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp) + for u in _key.uids: + self.mykey['uids'][u.email] = {'name': u.name, + 'comment': u.comment, + 'updated': datetime.datetime.utcfromtimestamp(u.last_update)} + # Now let's set up our trusts. + if self.args['batch']: + self.batchParse() + else: + for k in list(self.keys.keys()): + self.promptTrust(k) + self.promptCheck(k) + self.promptLocal(k) + self.promptNotify(k) + # In case we removed any keys, we have to run this outside of the loops + for k in list(self.keys.keys()): + for t in ('trust', 'local', 'check', 'notify'): + self.keysCleanup(k, t) + # TODO: populate self.keys[key]['change']; we use this for trust (but not sigs) + return() + + def batchParse(self): + # First we grab the info from CSV + csvlines = csv.reader(self.csvraw, delimiter = ',', quotechar = '"') + for row in csvlines: + row[0] = row[0].replace('<', '').replace('>', '') + try: + if self.args['rcpts'][row[0]]['type'] == 'fpr': + k = row[0] + else: # It's an email. + key_set = False + while not key_set: + for i in list(self.keys.keys()): + if row[0] in list(self.keys[i]['uids'].keys()): + k = i + key_set = True + self.keys[k]['trust'] = row[1].lower().strip() + self.keys[k]['local'] = row[2].lower().strip() + self.keys[k]['check'] = row[3].lower().strip() + self.keys[k]['notify'] = row[4].lower().strip() + except KeyError: + continue # It was deemed to be an invalid key earlier + return() + + def promptTrust(self, k): + if 'trust' not in self.keys[k].keys() or not self.keys[k]['trust']: + trust_in = input(('\nWhat trust level should we assign to {0}? (The default is '+ + 'Marginal.)\n\t\t\t\t ({1} <{2}>)' + + '\n\n\t\033[1m-1 = Never\n\t 0 = Unknown\n\t 1 = Untrusted\n\t 2 = Marginal\n\t 3 = Full' + + '\n\t 4 = Ultimate\033[0m\nTrust: ').format(k, + self.keys[k]['pkey']['name'], + self.keys[k]['pkey']['email'])) + if trust_in == '': + trust_in = 'marginal' # Has to be a str, so we can "pretend" it was entered + self.keys[k]['trust'] = trust_in + return() + + def promptCheck(self, k): + if 'check' not in self.keys[k].keys() or self.keys[k]['check'] == None: + check_in = input(('\nHow carefully have you checked {0}\'s validity of identity/ownership of the key? ' + + '(Default is Unknown.)\n' + + '\n\t\033[1m0 = Unknown\n\t1 = None\n\t2 = Casual\n\t3 = Careful\033[0m\nCheck level: ').format(k)) + if check_in == '': + check_in == 'unknown' + self.keys[k]['check'] = check_in + return() + + def promptLocal(self, k): + if 'local' not in self.keys[k].keys() or self.keys[k]['local'] == None: + if self.args['keyservers']: + local_in = input(('\nShould we locally sign {0} '+ + '(if yes, the signature will be non-exportable; if no, we will be able to push to a keyserver) ' + + '(Yes/\033[1mNO\033[0m)? ').format(k)) + if local_in == '': + local_in = False + self.keys[k]['local'] = local_in + return() + + def promptNotify(self, k): + if 'notify' not in self.keys[k].keys() or self.keys[k]['notify'] == None: + notify_in = input(('\nShould we notify {0} (via <{1}>) (\033[1mYES\033[0m/No)? ').format(k, + self.keys[k]['pkey']['email'])) + if notify_in == '': + notify_in = True + self.keys[k]['local'] = local_in + return() + + def keysCleanup(self, k, t): # At some point, this WHOLE thing would probably be cleaner with bitwise flags... + s = t + _errs = {'trust': 'trust level', + 'local': 'local signature option', + 'check': 'check level', + 'notify': 'notify flag'} + if k not in self.keys.keys(): + return() # It was deleted already. + if t in ('local', 'notify'): # these use a binary mapping + t = 'binmap' + # We can do some basic stuff right here. + if str(self.keys[k][s]).lower() in ('n', 'no', 'false'): + self.keys[k][s] = False + return() + elif str(self.keys[k][s]).lower() in ('y', 'yes', 'true'): + self.keys[k][s] = True + return() + # Make sure we have a known value. These will ALWAYS be str's, either from the CLI or CSV. + value_in = str(self.keys[k][s]).lower().strip() + for dictk, dictv in self.maps[t].items(): + if value_in == dictv[0]: + self.keys[k][s] = int(dictk) + elif value_in == str(dictk): + self.keys[k][s] = int(dictk) + if not isinstance(self.keys[k][s], int): # It didn't get set + print('{0}: "{1}" is not a valid {2}; skipping. Run kant again to fix.'.format(k, self.keys[k][s], _errs[s])) + del(self.keys[k]) + return() + return() + + def sigKeys(self): # The More Business-End(TM) + # NOTE: If the trust level is anything but 2 (the default), we should use op_interact() instead and do a tsign. + self.ctx.keylist_mode = gpg.constants.KEYLIST_MODE_SIGS + _mkey = self.mykey['pkey']['key'] + self.ctx.signers = [_mkey] + for k in list(self.keys.keys()): + key = self.keys[k]['pkey']['key'] + for uid in key.uids: + for s in uid.signatures: + try: + signerkey = ctx.get_key(s.keyid).subkeys[0].fpr + if signerkey == mkey.subkeys[0].fpr: + self.trusts[k]['sign'] = False # We already signed this key + except gpgme.GpgError: + pass # usually if we get this it means we don't have a signer's key in our keyring + # And again, we loop. ALLLLL that buildup for one line. + for k in list(self.keys.keys()): + # TODO: configure to allow for user-entered expiration? + if self.keys[k]['sign']: + self.ctx.key_sign(self.keys[k]['pkey']['key'], local = self.keys[k]['local']) + return() + + class KeyEditor(object): + def __init__(self, optmap): + self.replied_once = False # This is used to handle the first prompt vs. the last + self.optmap = optmap + return(None) + + def editKey(self, status, args, out): + _result = None + out.seek(0, 0) + def mapDict(m, d): + return(reduce(operator.getitem, m, d)) + if args == 'keyedit.prompt' and self.replied_once: + _result = 'quit' + elif status == 'KEY_CONSIDERED': + _result = None + self.replied_once = False + elif status == 'GET_LINE': + self.replied_once = True + _ilist = args.split('.') + _result = mapDict(_ilist, self.optmap['prompts']) + if not _result: + _result = None + return(_result) + + def trustKeys(self): # The Son of Business-End(TM) + # TODO: add check for change + for k in list(self.keys.keys()): + _key = self.keys[k] + _map = self.getEditPrompt(_key) + out = gpg.Data() + self.ctx.interact(_key['pkey']['key'], self.KeyEditor(_map).editKey, sink = out, fnc_value = out) + out.seek(0, 0) + return() + + def pushKeys(self): # The Last Business-End(TM) + for k in list(self.keys.keys()): + if not self.keys[k]['local'] and self.keys[k]['sign']: + self.ctx.op_export(k, gpg.constants.EXPORT_MODE_EXTERN, None) + return() + + class Mailer(object): # I lied; The Return of the Business-End(TM) + def __init__(self): + _homeconf = os.path.join(os.environ['HOME'], '.msmtprc') + _sysconf = '/etc/msmtprc' + self.msmtp = {'path': None} + if not os.path.isfile(_homeconf): + if not os.path.isfile(_sysconf): + self.msmtp['conf'] = False + else: + self.msmtp['conf'] = _sysconf + else: + self.msmtp['conf'] = _homeconf + if os.path.isfile(self.msmtp['conf']): + for p in (os.environ['PATH']).split(':'): + if os.path.isfile(os.path.join(p, 'msmtp')): + self.msmtp['path'] = os.path.join(p, 'msmtp') + if self.msmtp['path']: + # Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary. + # Now we need to parse the config. + self.msmtp['cfg'] = self.getCfg() + return(None) + + def getCfg(self): + cfg = {'default': None, 'defaults': {}} + _defaults = False + _acct = None + with open(self.msmtp['conf'], 'r') as f: + _cfg_raw = f.read() + for l in _cfg_raw.splitlines(): + if re.match('^\s?(#.*|)$', l): + continue # Skip over blank and commented lines + _line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)] + if _line[0] == 'account': + if re.match('^default\s?:\s?', _line[1]): # it's the default account specifier + cfg['default'] = _line[1].split(':', maxsplit = 1)[1].strip() + else: + if _line[1] not in cfg.keys(): # it's a new account definition + cfg[_line[1]] = {} + _acct = _line[1] + _defaults = False + elif _line[0] == 'defaults': # it's the defaults + _acct = 'defaults' + else: # it's a config directive + cfg[_acct][_line[0]] = _line[1] + for a in list(cfg): + if a != 'default': + for k, v in cfg['defaults'].items(): + if k not in cfg[a].keys(): + cfg[a][k] = v + del(cfg['defaults']) + return(cfg) + + def sendEmail(self, msg, key, profile): # This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc. + if 'tls-starttls' in self.msmtp['cfg'][profile].keys() and self.msmtp['cfg'][profile]['tls-starttls'] == 'on': + smtpserver = smtplib.SMTP(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.starttls() + # we need to EHLO twice with a STARTTLS because email is weird. + elif self.msmtp['cfg'][profile]['tls'] == 'on': + smtpserver = smtplib.SMTP_SSL(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.login(self.msmtp['cfg'][profile]['user'], self.msmtp['cfg'][profile]['password']) + smtpserver.sendmail(self.msmtp['cfg'][profile]['user'], key['pkey']['email'], msg.as_string()) + smtpserver.close() + return() + + def postalWorker(self): + m = self.Mailer() + if 'KANT' in m.msmtp['cfg'].keys(): + _profile = 'KANT' + else: + _profile = m.msmtp['cfg']['default'] # TODO: let this be specified on the CLI args? + if 'user' not in m.msmtp['cfg'][_profile].keys() or not m.msmtp['cfg'][_profile]['user']: + return() # We don't have MSMTP configured. + # Reconstruct the keyserver list. + _keyservers = [] + for k in self.args['keyservers']: + _keyservers.append('{0}://{1}:{2}'.format(k['proto'], k['server'], k['port'][0])) + # Export our key so we can attach it. + _pubkeys = {} + for e in ('asc', 'gpg'): + if e == 'asc': + self.ctx.armor = True + else: + self.ctx.armor = False + _pubkeys[e] = gpg.Data() # This is a data buffer to store your ASCII-armored pubkeys + self.ctx.op_export_keys([self.mykey['pkey']['key']], 0, _pubkeys[e]) + _pubkeys[e].seek(0, 0) # Read with e.g. _sigs['asc'].read() + for k in list(self.keys.keys()): + if self.keys[k]['notify']: + _body = {} + for t in list(self.tpls.keys()): + # There's gotta be a more efficient way of doing this... + #_tplenv = jinja2.Environment(loader = jinja2.BaseLoader()).from_string(self.tpls[t]) + _tplenv = jinja2.Environment().from_string(self.tpls[t]) + _body[t] = _tplenv.render(key = self.keys[k], + mykey = self.mykey, + keyservers = _keyservers) + b = MIMEMultipart('alternative') # Set up a body + for c in _body.keys(): + b.attach(MIMEText(_body[c], c)) + bmsg = MIMEMultipart() + bmsg.attach(b) + for s in _pubkeys.keys(): + _attchmnt = MIMEApplication(_pubkeys[s].read(), '{0}.{1}'.format(self.mykey['pkey']['key'].fpr, s)) + _attchmnt['Content-Disposition'] = 'attachment; filename="{0}.{1}"'.format(self.mykey['pkey']['key'].fpr, s) + bmsg.attach(_attchmnt) + # Now we sign the body. This incomprehensible bit monkey-formats bmsg to be a multi-RFC-compatible + # string, which is then passed to our gpgme instance's signing mechanishm, and the output of that is + # returned as plaintext. Whew. + self.ctx.armor = True + + _sig = self.ctx.sign((bmsg.as_string().replace('\n', '\r\n')).encode('utf-8'), + mode = gpg.constants.SIG_MODE_DETACH) + imsg = Message() # Build yet another intermediate message... + imsg['Content-Type'] = 'application/pgp-signature; name="signature.asc"' + imsg['Content-Description'] = 'OpenPGP digital signature' + imsg.set_payload(_sig[0].decode('utf-8')) + msg = MIMEMultipart(_subtype = 'signed', + micalg = "pgp-{0}".format(self.maps['hashalgos'][_sig[1].signatures[0].hash_algo]), + protocol = 'application/pgp-signature') + msg.attach(bmsg) # Attach the body (plaintext, html, pubkey attachmants) + msg.attach(imsg) # Attach the isignature + msg['To'] = self.keys[k]['pkey']['email'] + if 'from' in m.msmtp['cfg'][_profile].keys(): + msg['From'] = m.msmtp['cfg'][_profile]['from'] + else: + msg['From'] = self.mykey['pkey']['email'] + msg['Subject'] = 'Your GnuPG/PGP key has been signed' + msg['Openpgp'] = 'id={0}'.format(self.mykey['pkey']['key'].fpr) + msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') + msg['User-Agent'] = 'KANT (part of the r00t^2 OpTools suite: https://git.square-r00t.net/OpTools)' + m.sendEmail(msg, self.keys[k], _profile) # Send the email + for d in (msg, imsg, bmsg, b, _body, _tplenv): # Not necessary, but it pays to be paranoid; we do NOT want leaks. + del(d) + del(m) + return() + + def saveResults(self): + _cachedir = os.path.join(self.cfgdir, 'cache', datetime.datetime.utcnow().strftime('%Y.%m.%d_%H.%M.%S')) + os.makedirs(_cachedir, exist_ok = True) + for k in self.keys.keys(): + _keyout = self.keys[k] + # We need to normalize the datetime objects and gpg objects to strings + _keyout['pkey']['creation'] = str(self.keys[k]['pkey']['creation']) + _keyout['pkey']['key'] = '' + for u in list(_keyout['uids'].keys()): + _keyout['uids'][u]['updated'] = str(self.keys[k]['uids'][u]['updated']) + for s in list(_keyout['subkeys'].keys()): + _keyout['subkeys'][s] = str(self.keys[k]['subkeys'][s]) + _fname = os.path.join(_cachedir, '{0}.json'.format(k)) + with open(_fname, 'a') as f: + f.write('{0}\n'.format(json.dumps(_keyout, sort_keys = True, indent = 4))) + del(_keyout) + # And let's grab a copy of our key in the state that it exists in currently + _mykey = self.mykey + # We need to normalize the datetime objects and gpg objects to strings again + _mykey['pkey']['creation'] = str(_mykey['pkey']['creation']) + _mykey['pkey']['key'] = '' + for u in list(_mykey['uids'].keys()): + _mykey['uids'][u]['updated'] = str(self.mykey['uids'][u]['updated']) + for s in list(_mykey['subkeys'].keys()): + _mykey['subkeys'][s] = str(self.mykey['subkeys'][s]) + with open(os.path.join(_cachedir, '_SIGKEY.json'), 'w') as f: + f.write('{0}\n'.format(json.dumps(_mykey, sort_keys = True, indent = 4))) + return() + + def serverParser(self, uri): + # https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples + _server = {} + _urlobj = urllib.parse.urlparse(uri) + _server['proto'] = _urlobj.scheme + _lazy = False + if not _server['proto']: + _server['proto'] = 'hkp' # Default + _server['server'] = _urlobj.hostname + if not _server['server']: + _server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri, re.MULTILINE) + _lazy = True + _server['port'] = _urlobj.port + if not _server['port']: + if _lazy: + _p = re.sub('.*:([0-9]+)$', '\g<1>', uri, re.MULTILINE) + _server['port'] = self.maps['proto'][_server['proto']] # Default + return(_server) + + def verifyArgs(self, locargs): + ## Some pythonization... + if not locargs['batch']: + locargs['keys'] = [re.sub('\s', '', k) for k in locargs['keys'].split(',')] + else: + ## Batch file + _batchfilepath = os.path.abspath(os.path.expanduser(locargs['keys'])) + if not os.path.isfile(_batchfilepath): + raise ValueError('{0} does not exist or is not a regular file.'.format(_batchfilepath)) + else: + with open(_batchfilepath, 'r') as f: + self.csvraw = f.readlines() + locargs['keys'] = _batchfilepath + locargs['keyservers'] = [re.sub('\s', '', s) for s in locargs['keyservers'].split(',')] + locargs['keyservers'] = [self.serverParser(s) for s in locargs['keyservers']] + ## Key(s) to sign + locargs['rcpts'] = {} + if not locargs['batch']: + _keyiter = locargs['keys'] + else: + _keyiter = [] + for row in csv.reader(self.csvraw, delimiter = ',', quotechar = '"'): + _keyiter.append(row[0]) + for k in _keyiter: + locargs['rcpts'][k] = {} + try: + int(k, 16) + _ktype = 'fpr' + except: # If it isn't a valid key ID... + if not re.match('^?$', k): # is it an email address? + raise ValueError('{0} is not a valid email address'.format(k)) + else: + r = k.replace('<', '').replace('>', '') + locargs['rcpts'][r] = locargs['rcpts'][k] + del(locargs['rcpts'][k]) + k = r + _ktype = 'email' + locargs['rcpts'][k]['type'] = _ktype + # Security is important. We don't want users getting collisions, so we don't allow shortened key IDs. + if _ktype == 'fpr' and not len(k) == 40: + raise ValueError('{0} is not a full 40-char key ID or key fingerprint'.format(k)) + ## Signing key + if not locargs['sigkey']: + raise ValueError('A key for signing is required') # We need a key we can sign with. + else: + if not os.path.lexists(locargs['gpgdir']): + raise FileNotFoundError('{0} does not exist'.format(locargs['gpgdir'])) + elif os.path.isfile(locargs['gpgdir']): + raise NotADirectoryError('{0} is not a directory'.format(locargs['gpgdir'])) + # Now we need to verify that the private key exists... + try: + _ctx = gpg.Context() + _sigkey = _ctx.get_key(locargs['sigkey'], True) + except gpg.errors.GPGMEError or gpg.errors.KeyNotFound: + raise ValueError('Cannot use key {0}'.format(locargs['sigkey'])) + # And that it is an eligible candidate to use to sign. + if not _sigkey.can_sign or True in (_sigkey.revoked, _sigkey.expired, _sigkey.disabled): + raise ValueError('{0} is not a valid candidate for signing'.format(locargs['sigkey'])) + ## Keyservers + if locargs['testkeyservers']: + for s in locargs['keyservers']: + # Test to make sure the keyserver is accessible. + _v6test = socket(AF_INET6, SOCK_DGRAM) + try: + _v6test.connect(('ipv6.square-r00t.net', 0)) + _nettype = AF_INET6 # We have IPv6 intarwebz + except: + _nettype = AF_INET # No IPv6, default to IPv4 + for _proto in locargs['keyservers'][s]['port'][1]: + if _proto == 'udp': + _netproto = SOCK_DGRAM + elif _proto == 'tcp': + _netproto = SOCK_STREAM + _sock = socket(nettype, netproto) + _sock.settimeout(10) + _tests = _sock.connect_ex((locargs['keyservers'][s]['server'], + int(locargs['keyservers'][s]['port'][0]))) + _uristr = '{0}://{1}:{2} ({3})'.format(locargs['keyservers'][s]['proto'], + locargs['keyservers'][s]['server'], + locargs['keyservers'][s]['port'][0], + _proto.upper()) + if not tests == 0: + raise OSError('Keyserver {0} is not available'.format(_uristr)) + else: + print('Keyserver {0} is accepting connections.'.format(_uristr)) + sock.close() + return(locargs) + +def parseArgs(): + def getDefGPGDir(): + try: + gpgdir = os.environ['GNUPGHOME'] + except KeyError: + try: + homedir = os.environ['HOME'] + gpgdchk = os.path.join(homedir, '.gnupg') + except KeyError: + # There is no reason that this should ever get this far, but... edge cases be crazy. + gpgdchk = os.path.join(os.path.expanduser('~'), '.gnupg') + if os.path.isdir(gpgdchk): + gpgdir = gpgdchk + else: + gpgdir = None + return(gpgdir) + def getDefKey(defgpgdir): + os.environ['GNUPGHOME'] = defgpgdir + if not defgpgdir: + return(None) + defkey = None + ctx = gpg.Context() + for k in ctx.keylist(None, secret = True): # "None" is query string; this grabs all keys in the private keyring + if k.can_sign and True not in (k.revoked, k.expired, k.disabled): + defkey = k.subkeys[0].fpr + break # We'll just use the first primary key we find that's valid as the default. + return(defkey) + def getDefKeyservers(defgpgdir): + srvlst = [None] + # We don't need these since we use the gpg agent. Requires GPG 2.1 and above, probably. + #if os.path.isfile(os.path.join(defgpgdir, 'dirmngr.conf')): + # pass + dirmgr_out = subprocess.run(['gpg-connect-agent', '--dirmngr', 'keyserver', '/bye'], stdout = subprocess.PIPE) + for l in dirmgr_out.stdout.decode('utf-8').splitlines(): + #if len(l) == 3 and l.lower().startswith('s keyserver'): # It's a keyserver line + if l.lower().startswith('s keyserver'): # It's a keyserver line + s = l.split()[2] + if len(srvlst) == 1 and srvlst[0] == None: + srvlst = [s] + else: + srvlst.append(s) + return(','.join(srvlst)) + defgpgdir = getDefGPGDir() + defkey = getDefKey(defgpgdir) + defkeyservers = getDefKeyservers(defgpgdir) + args = argparse.ArgumentParser(description = 'Keysigning Assistance and Notifying Tool (KANT)', + epilog = 'brent s. || 2017 || https://square-r00t.net') + args.add_argument('-k', + '--keys', + dest = 'keys', + metavar = 'KEYS | /path/to/batchfile', + required = True, + help = 'A single/comma-separated list of keys to sign, ' + + 'trust, & notify. Can also be an email address. ' + + 'If -b/--batch is specified, this should instead be ' + + 'a path to the batch file. See the man page for more info.') + args.add_argument('-K', + '--sigkey', + dest = 'sigkey', + default = defkey, + help = 'The key to use when signing other keys. Default is \033[1m{0}\033[0m.'.format(defkey)) + args.add_argument('-t', + '--trust', + dest = 'trustlevel', + default = None, + help = 'The trust level to automatically apply to all keys ' + + '(if not specified, kant will prompt for each key). ' + + 'See BATCHFILE/TRUSTLEVEL in the man page for trust ' + + 'level notations.') + args.add_argument('-c', + '--check', + dest = 'checklevel', + default = None, + help = 'The level of checking done (if not specified, kant will ' + + 'prompt for each key). See -b/--batch for check level notations.') + args.add_argument('-l', + '--local', + dest = 'local', + default = None, + help = 'Make the signature(s) local-only (i.e. don\'t push to a keyserver).') + args.add_argument('-n', + '--no-notify', + dest = 'notify', + action = 'store_false', + help = 'If specified, do NOT notify any key recipients that you\'ve signed ' + + 'their key, even if KANT is able to.') + args.add_argument('-s', + '--keyservers', + dest = 'keyservers', + default = defkeyservers, + help = 'The comma-separated keyserver(s) to push to.\n' + + 'Default keyserver list is: \n\n\t\033[1m{0}\033[0m\n\n'.format(re.sub(',', + '\n\t', + defkeyservers))) + args.add_argument('-b', + '--batch', + dest = 'batch', + action = 'store_true', + help = 'If specified, -k/--keys is a CSV file to use as a ' + + 'batch run. See the BATCHFILE section in the man page for more info.') + args.add_argument('-D', + '--gpgdir', + dest = 'gpgdir', + default = defgpgdir, + help = 'The GnuPG configuration directory to use (containing\n' + + 'your keys, etc.); default is \033[1m{0}\033[0m.'.format(defgpgdir)) + args.add_argument('-T', + '--testkeyservers', + dest = 'testkeyservers', + action = 'store_true', + help = 'If specified, initiate a test connection with each\n' + 'set keyserver before anything else. Disabled by default.') + return(args) + + + + + +def main(): + # This could be cleaner-looking, but we do it this way so the class can be used externally + # with a dict instead of an argparser result. + args = vars(parseArgs().parse_args()) + sess = SigSession(args) + sess.modifyDirmngr('new') + sess.buildKeys() + sess.sigKeys() + sess.trustKeys() + sess.pushKeys() + sess.postalWorker() + sess.saveResults() + sess.modifyDirmngr('old') + +if __name__ == '__main__': + main() diff --git a/gpg/kant/kant.py b/gpg/kant/kant.py index 0a7cd5e..eaaa68d 100755 --- a/gpg/kant/kant.py +++ b/gpg/kant/kant.py @@ -4,6 +4,7 @@ import argparse import csv import datetime import email +import jinja2 import os import re import shutil @@ -42,13 +43,13 @@ import gpg # non-stdlib; Arch package is "python-gpgme" - see class sigsession(object): def __init__(self, args): self.args = args + self.keyids = [] def getKeys(self): - # Get our concept + # Get our context os.environ['GNUPGHOME'] = self.args['gpgdir'] ctx = gpg.Context() keys = {} - self.keyids = [] # Do we have the key already? If not, fetch. for k in list(self.args['rcpts']): if self.args['rcpts'][k]['type'] == 'fpr': diff --git a/gpg/kant/test.py b/gpg/kant/test.py index f2f31c8..49a32f5 100755 --- a/gpg/kant/test.py +++ b/gpg/kant/test.py @@ -3,12 +3,18 @@ # This is less of a test suite and more of an active documentation on some python-gpgme (https://pypi.python.org/pypi/gpg) examples. # Because their only documentation for the python bindings is in pydoc, and the C API manual is kind of useless. +import datetime import gpg import gpg.constants import inspect +import jinja2 import os import pprint import re +import smtplib +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText import subprocess import operator from functools import reduce @@ -107,7 +113,7 @@ class KeyEditor(object): def edit_fnc(self, status, args, out): result = None - out.seek(0,0) + out.seek(0, 0) #print(status, args) #print(out.read().decode('utf-8')) #print('{0} ({1})'.format(status, args)) @@ -130,5 +136,147 @@ class KeyEditor(object): # Test setting trust out = gpg.Data() ctx.interact(tkey2, KeyEditor().edit_fnc, sink = out, fnc_value = out) -out.seek(0,0) -#print(out.read(), end = ' ') +out.seek(0, 0) +#print(out.read(), end = '\n\n') + +#Test sending to a keyserver +buf = gpg.Data() +ctx.op_export(tkey2.fpr, gpg.constants.EXPORT_MODE_EXTERN, None) + +# Test writing the pubkey out to a file +buf = gpg.Data() +ctx.op_export_keys([tkey2], 0, buf) # do i NEED to specify a mode? +buf.seek(0, 0) +with open('/tmp/pubkeytest.gpg', 'wb') as f: + f.write(buf.read()) +#del(buf) +# Let's also test writing out the ascii-armored.. +ctx.armor = True +#buf = gpg.Data() +buf.seek(0, 0) +ctx.op_export_keys([tkey2], 0, buf) # do i NEED to specify a mode? +buf.seek(0, 0) +#print(buf.read()) +#buf.seek(0, 0) +with open('/tmp/pubkeytest.asc', 'wb') as f: + f.write(buf.read()) +del(buf) + +# And lastly, let's test msmtprc +def getCfg(fname): + cfg = {'default': None, 'defaults': {}} + _defaults = False + _acct = None + with open(fname, 'r') as f: + cfg_raw = f.read() + for l in cfg_raw.splitlines(): + if re.match('^\s?(#.*|)$', l): + continue # skip over blank and commented lines + line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)] + if line[0] == 'account': + if re.match('^default\s?:\s?', line[1]): # it's the default account specifier + cfg['default'] = line[1].split(':', maxsplit = 1)[1].strip() + else: + if line[1] not in cfg.keys(): # it's a new account definition + cfg[line[1]] = {} + _acct = line[1] + _defaults = False + elif line[0] == 'defaults': # it's the defaults + _acct = 'defaults' + else: # it's a config directive + cfg[_acct][line[0]] = line[1] + for a in list(cfg): + if a != 'default': + for k, v in cfg['defaults'].items(): + if k not in cfg[a].keys(): + cfg[a][k] = v + del(cfg['defaults']) + return(cfg) +homeconf = os.path.join(os.environ['HOME'], '.msmtprc') +sysconf = '/etc/msmtprc' +msmtp = {'path': None} +if not os.path.isfile(homeconf): + if not os.path.isfile(sysconf): + msmtp['conf'] = False + else: + msmtp['conf'] = sysconf +else: + msmtp['conf'] = homeconf +if os.path.isfile(msmtp['conf']): + path = os.environ['PATH'] + for p in path.split(':'): + fullpath = os.path.join(p, 'msmtp') + if os.path.isfile(fullpath): + msmtp['path'] = fullpath + break # break out the first instance of it we find since the shell parses PATH first to last and so do we + if msmtp['path']: + # Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary. + # Now we need to parse the config. + msmtp['cfg'] = getCfg(msmtp['conf']) +pprint.pprint(msmtp) +if msmtp['path']: + # Get the appropriate MSMTP profile + profile = msmtp['cfg']['default'] + # Buuuut i use a different profile when i test, because i use msmtp for production-type stuff. + #if os.environ['USER'] == 'bts': + # profile = 'gmailtesting' + # Now we can try to send an email... yikes. + ## First we set up the message templates. + body_in = {'plain': None, 'html': None} + body_in['plain'] = """Hello, person! + + This is a test message. + + Thanks.""" + body_in['html'] = """\ + + + +

Hi there, person! This is a test email.

+

It supports fun things like HTML.

+

--
https://games.square-r00t.net
+ Admin: r00t^2 +

+ + """ + # Now, some attachments. + part = {} + ctx.armor = False + buf = gpg.Data() + ctx.op_export_keys([tkey2], 0, buf) + buf.seek(0, 0) + part['gpg'] = MIMEApplication(buf.read(), '{0}.gpg'.format(tkey2.fpr)) + part['gpg']['Content-Disposition'] = 'attachment; filename="{0}.gpg"'.format(tkey2.fpr) + ctx.armor = True + buf.seek(0, 0) + ctx.op_export_keys([tkey2], 0, buf) + buf.seek(0, 0) + part['asc'] = MIMEApplication(buf.read(), '{0}.asc'.format(tkey2.fpr)) + part['asc']['Content-Disposition'] = 'attachment; filename="{0}.asc"'.format(tkey2.fpr) + #msg = MIMEMultipart('alternative') + msg = MIMEMultipart() + msg['preamble'] = 'This is a multi-part message in MIME format.\n' + msg['From'] = msmtp['cfg'][profile]['from'] + msg['To'] = msmtp['cfg'][profile]['from'] # to send to more than one: ', '.join(somelist) + msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') + msg['Subject'] = 'TEST EMAIL VIA TEST.PY' + msg['epilogue'] = '' + body = MIMEMultipart('alternative') + body.attach(MIMEText(body_in['plain'], 'plain')) + body.attach(MIMEText(body_in['html'], 'html')) + msg.attach(body) + for f in part.keys(): + msg.attach(part[f]) + + # This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc. + if 'tls-starttls' in msmtp['cfg'][profile].keys() and msmtp['cfg'][profile]['tls-starttls'] == 'on': + smtpserver = smtplib.SMTP(msmtp['cfg'][profile]['host'], int(msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.starttls() + # we need to EHLO again after a STARTTLS because email is weird. + elif msmtp['cfg'][profile]['tls'] == 'on': + smtpserver = smtplib.SMTP_SSL(msmtp['cfg'][profile]['host'], int(msmtp['cfg'][profile]['port'])) + smtpserver.ehlo() + smtpserver.login(msmtp['cfg'][profile]['user'], msmtp['cfg'][profile]['password']) + smtpserver.sendmail(msmtp['cfg'][profile]['user'], msg['To'], msg.as_string()) + smtpserver.close()