certparser/certparser.py

371 lines
16 KiB
Python
Raw Permalink Normal View History

2018-12-18 04:14:17 -05:00
#!/usr/bin/env python3
# stdlib
import collections
import copy
import datetime
import hashlib
import importlib
import ipaddress
import json
import os
import pprint
import re
import ssl
from urllib import parse
# PyPi/PIP
2018-12-18 07:24:29 -05:00
import OpenSSL
2018-12-18 04:14:17 -05:00
class CertParse(object):
2018-12-18 07:24:29 -05:00
def __init__(self, target,
port = 443,
force = None,
cert_type = 'pem',
starttls = False,
extensions = False,
2018-12-18 04:14:17 -05:00
alt_names = False):
self.target = target
self.port = port
self.force_type = force
self.cert_type = cert_type
self.starttls = starttls
self.extensions = extensions
self.alt_names = alt_names
self.cert = None
self.certinfo = None
self.get_type()
def getCert(self):
if self.cert_type.lower() == 'pem':
self.cert_type = OpenSSL.crypto.FILETYPE_PEM
elif self.cert_type.lower() == 'asn1':
self.cert_type = OpenSSL.crypto.FILETYPE_ASN1
else:
raise ValueError(('{0} is not a valid cert type; must be either ' +
'"pem" or "asn1"').format(self.cert_type))
if not self.force_type in ('url', 'domain', 'ip'):
with open(self.target, 'rb') as f:
self.cert = OpenSSL.crypto.load_certificate(self.cert_type,
f.read())
else:
_cert = ssl.get_server_certificate((self.target, self.port))
self.cert = OpenSSL.crypto.load_certificate(self.cert_type,
_cert)
return()
def parseCert(self):
certinfo = collections.OrderedDict()
timefmt = '%Y%m%d%H%M%SZ'
certinfo['Subject'] = self.parse_name(self.cert.get_subject().\
get_components())
certinfo['EXPIRED'] = self.cert.has_expired()
certinfo['Issuer'] = self.parse_name(self.cert.get_issuer().\
get_components())
certinfo['Issued'] = str(datetime.datetime.strptime(
self.cert.get_notBefore().decode('utf-8'),
timefmt))
certinfo['Expires'] = str(datetime.datetime.strptime(
self.cert.get_notAfter().decode('utf-8'),
timefmt))
if self.extensions:
certinfo['Extensions'] = self.parse_ext()
elif self.alt_names:
certinfo['SANs'] = self.parse_ext_san_only()
certinfo['Pubkey'] = self.get_pubkey()
certinfo['Serial'] = int(self.cert.get_serial_number())
certinfo['Signature Algorithm'] = self.cert.get_signature_algorithm().\
decode('utf-8')
certinfo['Version'] = self.cert.get_version()
certinfo['Subject Name Hash'] = self.cert.subject_name_hash()
certinfo['Fingerprints'] = self.gen_hashes()
self.certinfo = certinfo
return()
def print(self, json_fmt = None):
if json_fmt is None:
json_fmt = self.json_fmt
if json_fmt:
output = json.dumps(self.certinfo, indent = 4)
else:
output = self.certinfo
if __name__ == '__main__':
if not json_fmt:
pprint.pprint(output, compact = False, width = cols)
else:
print(output)
return()
return(output)
def get_pubkey(self):
pubkey = {}
key = self.cert.get_pubkey()
pubkey['Bit Length'] = key.bits()
# I wish there was a more comfortable way of comparing these.
if key.type() == OpenSSL.crypto.TYPE_RSA:
pubkey['Algorithm'] = 'RSA'
elif key.type() == OpenSSL.crypto.TYPE_DSA:
pubkey['Algorithm'] = 'DSA'
return(pubkey)
def gen_hashes(self):
hashes = {}
# Note: MD2 is *so old* that they aren't even
# *supported in python 3*.
# If we NEED to implement, https://urchin.earth.li/~twic/md2.py
fpt_types = sorted([i.lower() for i in ['md2', 'md5', 'sha1', 'mdc2',
'ripemd160', 'blake2b512',
'blake2s256', 'sha224',
'sha256', 'sha384', 'sha512']])
supported_types = sorted([i.lower() for i in \
list(hashlib.algorithms_available)])
cert_hash_types = [i for i in fpt_types if i in supported_types]
for h in cert_hash_types:
hashes[h.upper()] = self.cert.digest(h).decode('utf-8')
return(hashes)
def parse_name(self, item):
component_map = {'C': 'Country',
'countryName': 'Country',
'ST': 'State/Province',
'stateOrProvinceName': 'State/Province',
'L': 'Locality/City/Town/Region',
'localityName': 'Locality/City/Town/Region',
'O': 'Organization',
'organizationName': 'Organization',
'OU': 'Department/Team/Organization Unit',
'organizationalUnitName': ('Department/Team/' +
'Organization Unit'),
'CN': 'Common name',
'commonName': 'Common name',
'emailAddress': 'eMail Address'}
info = {}
for c in item:
item = c[0].decode('utf-8')
value = c[1].decode('utf-8')
if item in component_map.keys():
info[component_map[item]] = value
else:
info[item] = value
return(info)
def parse_ext_san_only(self):
SANs = []
for idx in range(0, self.cert.get_extension_count()):
ext = self.cert.get_extension(idx)
name = ext.get_short_name().decode('utf-8').lower()
x = str(ext).strip()
if name == 'subjectaltname':
val_lst = [i.strip() for i in x.split(',')]
for v in val_lst:
parsed_val = re.sub('^\s*DNS:\s*(.*)', '\g<1>', v)
if parsed_val not in ('\n', ''):
SANs.append(parsed_val.lower())
return(SANs)
def parse_ext(self):
exts = {}
for idx in range(0, self.cert.get_extension_count()):
ext = self.cert.get_extension(idx)
keyname = ext.get_short_name().decode('utf-8')
value_str = str(ext).strip()
# These should be split into lists by commas.
if keyname in ('subjectAltName', 'keyUsage', 'extendedKeyUsage',
'basicConstraints'):
val_lst = [i.strip() for i in value_str.split(',')]
value_str = []
for v in val_lst:
parsed_val = re.sub('^\s*DNS:\s*(.*)', '\g<1>', v)
if parsed_val not in ('\n', ''):
value_str.append(parsed_val)
# These should be split into lists by lines.
elif keyname in ('certificatePolicies', 'ct_precert_scts',
'authorityInfoAccess'):
val_lst = [i.strip() for i in value_str.splitlines()]
value_str = []
for v in val_lst:
value_str.append(v)
exts[keyname] = value_str
# These are split FURTHER into dicts but require unique... massaging.
# authorityInfoAccess
if 'authorityInfoAccess' in exts.keys():
_tmp = copy.deepcopy(exts['authorityInfoAccess'])
exts['authorityInfoAccess'] = {}
for i in _tmp:
x = [n.strip() for n in i.split('-', 1)]
y = [n.strip() for n in x[1].split(':', 1)]
exts['authorityInfoAccess'][x[0]] = {y[0]: y[1]}
# authorityKeyIdentifier
if 'authorityKeyIdentifier' in exts.keys():
_tmp = copy.deepcopy(exts['authorityKeyIdentifier'])
exts['authorityKeyIdentifier'] = {_tmp.split(':', 1)[0]:
_tmp.split(':', 1)[1]}
# basicConstraints
if 'basicConstraints' in exts.keys():
_tmp = copy.deepcopy(exts['basicConstraints'])
exts['basicConstraints'] = {}
for i in _tmp:
x = [n.strip() for n in i.split(':', 1)]
if len(x) >= 1:
if x[1].lower() in ('true', 'false'):
x[1] = (x[1].lower() == 'true')
exts['basicConstraints'][x[0]] = x[1]
else:
exts['basicConstraints'][x[0]] = True
# certificatePolicies
# What a mess.
if 'certificatePolicies' in exts.keys():
_tmp = copy.deepcopy(exts['certificatePolicies'])
exts['certificatePolicies'] = {}
last_key = None
for i in [n.strip() for n in _tmp]:
l = [y for y in i.split(':', 1) if y not in ('', None)]
if len(l) > 1:
# It MAY be a key:value.
if re.search('^\s+', l[1]):
val = l[1].strip()
if last_key == 'Policy':
if not isinstance(exts['certificatePolicies']\
[last_key],
list):
exts['certificatePolicies'][last_key] = [
exts['certificatePolicies'][last_key]]
exts['certificatePolicies'][last_key].append(val)
# I can't seem to get CPS as a separate dict.
# Patches welcome.
# Also, are CPS and User Notice *subitems* of Policy
# items?
elif last_key not in ('User Notice', 'CPS'):
# It's a value.
last_key = l[0].strip()
exts['certificatePolicies'][last_key] = val
else:
k = l[0].strip()
exts['certificatePolicies'][last_key][k] = val
else:
# Standalone key line
last_key = l[0].strip()
exts['certificatePolicies'][last_key] = {}
# ct_precert_scts
# another mess. a much. much, bigger mess.
if 'ct_precert_scts' in exts.keys():
_tmp = copy.deepcopy(exts['ct_precert_scts'])
exts['ct_precert_scts'] = {}
last_key = None
last_sub_key = None
cnt = 0
for i in [n.strip() for n in _tmp]:
l = [y for y in i.split(':', 1) if y not in ('', None)]
if len(l) > 1:
# Is it a line continuation (of a hex value)?
if ((re.search('^[0-9A-Z]{2}$', l[0])) and
(re.search('^[0-9A-Z:]*:?$', ':'.join(l)))):
exts['ct_precert_scts'][last_key][cnt]\
[last_sub_key] += ':'.join(l)
continue
# It MAY be a key:value.
if re.search('^\s+', l[1]) and (
last_key !=
'Signed Certificate Timestamp'):
# It's a value.
last_key = l[0].strip()
val = l[1].strip()
if val.lower() == 'none':
val = None
exts['ct_precert_scts'][last_key] = val
elif re.search('^\s+', l[1]):
last_sub_key = l[0].strip()
val = l[1].strip()
if val.lower() == 'none':
val = None
if last_sub_key == 'Signature':
val += ' '
exts['ct_precert_scts'][last_key][cnt]\
[last_sub_key] = val
else:
# Standalone key line
last_key = l[0].strip()
if last_key == 'Signed Certificate Timestamp':
if last_key not in exts['ct_precert_scts'].keys():
exts['ct_precert_scts'][last_key] = [{}]
else:
exts['ct_precert_scts'][last_key].append({})
cnt += 1
# some laaaast bit of cleanup...
if 'Signed Certificate Timestamp' in exts['ct_precert_scts']:
for i in exts['ct_precert_scts']\
['Signed Certificate Timestamp']:
if 'Signature' in i.keys():
d = i['Signature'].split()
i['Signature'] = {d[0]: d[1]}
return(exts)
def get_domain_from_url(self, url):
orig_url = url
# Needed in case a URL is passed with no http:// or https://, etc.
url = re.sub('^((ht|f)tps?://)*',
'https://',
url,
re.IGNORECASE).lower()
if not self.validURL(url):
raise ValueError(('{0} is not a valid URL').format(orig_url))
domain = parse.urlparse(url).netloc
return(domain)
def get_type(self):
if self.force_type:
# Just run the validator and some cleanup.
if self.force_type == 'url':
self.target = self.get_domain_from_url(self.target)
chk = self.validURL(self.target)
if chk:
self.force_type = 'domain'
elif self.force_type == 'ip':
chk = self.validIP(self.target)
elif self.force_type == 'domain':
chk = self.validDomain(self.target)
elif self.force_type == 'file':
self.target = os.path.abspath(os.path.expanduser(self.target))
chk = self.validPath(self.target)
if not chk:
raise TypeError(('{0} does not appear to be a valid ' +
'instance of type {1}'.format(self.target,
self.force_type)
))
if self.force_type in ('url', 'domain', 'ip'):
self.remote = True
else:
self.remote = False
return()
# Is it an IP address?
if self.validIP(self.target):
self.force_type = 'ip'
return()
# Is it a filepath?
fpath = os.path.abspath(os.path.expanduser(self.target))
if self.validPath(fpath):
self.target = fpath
self.force_type = 'file'
return()
# Is it a domain?
if self.validDomain(self.target):
self.force_type = 'domain'
return()
# Lastly, is it a URL?
if self.validURL(self.target):
domain = self.get_domain_from_url(self.target)
if self.validDomain(domain):
self.target = domain
self.force_type = 'domain'
if not self.force_type: # We couldn't detect it
raise RuntimeError(('Automatic type detection of {0} requested ' +
'but we could not determine what type of ' +
'resource it is'))
return()
2018-12-18 07:24:29 -05:00
def main(args):
2018-12-18 04:14:17 -05:00
p = CertParse(**args)
p.getCert()
p.parseCert()
p.print()