bdisk/bdisk/utils.py

1128 lines
43 KiB
Python
Raw Normal View History

2018-09-23 07:55:58 -04:00
# Yes, this is messy. They doesn't belong anywhere else, leave me alone.
import _io
2018-05-21 05:58:25 -04:00
import copy
import crypt
import GPG
2018-05-21 05:58:25 -04:00
import getpass
import hashid
import hashlib
2018-05-18 08:13:46 -04:00
import iso3166
import os
import pprint
2018-05-21 05:58:25 -04:00
import prompt_strings
import re
import string
import uuid
import validators
import zlib
2018-09-23 07:55:58 -04:00
import requests
import lxml.etree
2018-05-24 08:24:46 -04:00
import lxml.objectify
2018-05-22 06:01:18 -04:00
from bs4 import BeautifulSoup
from collections import OrderedDict
from dns import resolver
from email.utils import parseaddr as emailparse
from passlib.context import CryptContext as cryptctx
from urllib.parse import urlparse
2018-05-18 08:13:46 -04:00
from urllib.request import urlopen
# Supported by all versions of GNU/Linux shadow
passlib_schemes = ['des_crypt', 'md5_crypt', 'sha256_crypt', 'sha512_crypt']
# Build various hash digest name lists
digest_schemes = list(hashlib.algorithms_available)
# Provided by zlib
2018-09-23 07:55:58 -04:00
# TODO?
digest_schemes.append('adler32')
digest_schemes.append('crc32')
crypt_map = {'sha512': crypt.METHOD_SHA512,
'sha256': crypt.METHOD_SHA256,
'md5': crypt.METHOD_MD5,
'des': crypt.METHOD_CRYPT}
2018-09-23 07:55:58 -04:00
class Download(object):
def __init__(self, url, progress = True, offset = None, chunksize = 1024):
self.cnt_len = None
self.head = requests.head(url, allow_redirects = True).headers
self.req_headers = {}
self.range = False
self.url = url
self.offset = offset
self.chunksize = chunksize
self.progress = progress
if 'accept-ranges' in self.head:
if self.head['accept-ranmges'].lower() != 'none':
self.range = True
if 'content-length' in self.head:
try:
self.cnt_len = int(self.head['content-length'])
except TypeError:
pass
if self.cnt_len and self.offset and self.range:
if not self.offset <= self.cnt_len:
raise ValueError(('The offset requested ({0}) is greater than '
'the content-length value').format(self.offset, self.cnt_len))
self.req_headers['range'] = 'bytes={0}-'.format(self.offset)
def fetch(self):
if not self.progress:
self.req = requests.get(self.url, allow_redirects = True, headers = self.req_headers)
self.bytes_obj = self.req.content
else:
self.req = requests.get(self.url, allow_redirects = True, stream = True, headers = self.req_headers)
self.bytes_obj = bytes()
_bytelen = 0
# TODO: better handling for logging instead of print()s?
for chunk in self.req.iter_content(chunk_size = self.chunksize):
self.bytes_obj += chunk
if self.cnt_len:
print('\033[F')
print('{0:.2f}'.format((_bytelen / float(self.head['content-length'])) * 100),
end = '%',
flush = True)
_bytelen += self.chunksize
else:
print('.', end = '')
print()
return(self.bytes_obj)
2018-05-18 08:13:46 -04:00
class XPathFmt(string.Formatter):
def get_field(self, field_name, args, kwargs):
vals = self.get_value(field_name, args, kwargs), field_name
if not vals[0]:
vals = ('{{{0}}}'.format(vals[1]), vals[1])
return(vals)
class detect(object):
def __init__(self):
pass
2018-09-23 07:55:58 -04:00
def any_hash(self, hash_str, normalize = False):
h = hashid.HashID()
hashes = []
2018-05-18 08:13:46 -04:00
for i in h.identifyHash(hash_str):
if i.extended:
continue
x = i.name
2018-09-23 07:55:58 -04:00
if x.lower() in ('crc-32', 'ripemd-160', 'sha-1', 'sha-224', 'sha-256', 'sha-384', 'sha-512'):
# Gorram you, c0re.
x = re.sub('-', '', x.lower())
2018-09-23 07:55:58 -04:00
_hashes = [h.lower() for h in digest_schemes] # TODO: move this outside so we don't define it every invoke
if normalize:
x = re.sub('(-|crypt|\s+)', '', x.lower())
if x.lower() in sorted(list(set(_hashes))):
hashes.append(x)
return(hashes)
def password_hash(self, passwd_hash):
_ctx = cryptctx(schemes = passlib_schemes)
algo = _ctx.identify(passwd_hash)
if algo:
return(re.sub('_crypt$', '', algo))
else:
return(None)
return()
2018-05-22 06:01:18 -04:00
def password_hash_salt(self, salted_hash):
_hash_list = salted_hash.split('$')
if len(_hash_list) < 3:
return(None)
2018-05-22 06:01:18 -04:00
salt = _hash_list[2]
return(salt)
def remote_files(self, url_base, ptrn = None, flags = []):
soup = BeautifulSoup(Download(url_base, progress = False).fetch().decode('utf-8'),
'lxml')
2018-05-22 06:01:18 -04:00
urls = []
if 'regex' in flags:
if not isinstance(ptrn, str):
raise ValueError('"ptrn" must be a regex pattern to match '
'against')
else:
ptrn = re.compile(ptrn)
for u in soup.find_all('a'):
if not u.has_attr('href'):
continue
if 'regex' in flags:
if not ptrn.search(u.attrs['href']):
continue
if u.has_attr('href'):
urls.append(u.attrs['href'])
if not urls:
return(None)
# We certainly can't intelligently parse the printed timestamp since it
# varies so much and that'd be a nightmare to get consistent...
# But we CAN sort by filename.
if 'latest' in flags:
urls = sorted(list(set(urls)))
urls = urls[-1]
else:
urls = urls[0]
return(urls)
def gpgkeyID_from_url(self, url):
2018-09-23 07:55:58 -04:00
data = Download(url, progress = False).bytes_obj
g = GPG.GPGHandler()
key_ids = g.get_sigs(data)
del(g)
return(key_ids)
def gpgkey_info(self, keyID, secret = False):
def _get_key():
key = None
try:
2018-05-18 08:13:46 -04:00
key = g.ctx.get_key(keyID, secret = secret)
except GPG.gpg.errors.KeyNotFound:
return(None)
except Exception:
return(False)
return(key)
uids = {}
g = GPG.GPGHandler()
2018-05-18 08:13:46 -04:00
_orig_kl_mode = g.ctx.get_keylist_mode()
if _orig_kl_mode != GPG.gpg.constants.KEYLIST_MODE_EXTERN:
_key = _get_key()
if not _key:
2018-05-18 08:13:46 -04:00
g.ctx.set_keylist_mode(GPG.gpg.constants.KEYLIST_MODE_EXTERN)
_key = _get_key()
else:
_key = _get_key()
if not _key:
2018-05-18 08:13:46 -04:00
g.ctx.set_keylist_mode(_orig_kl_mode)
del(g)
return(None)
else:
uids['Full key'] = _key.fpr
uids['User IDs'] = []
for _uid in _key.uids:
_u = OrderedDict()
# Strings
for attr in ['Name', 'Email', 'Comment']:
s = getattr(_uid, attr.lower())
if s and s != '':
_u[attr] = s
# Key attributes
_u['Invalid'] = (True if _uid.invalid else False)
_u['Revoked'] = (True if _uid.revoked else False)
uids['User IDs'].append(_u)
2018-05-18 08:13:46 -04:00
g.ctx.set_keylist_mode(_orig_kl_mode)
del(g)
return(uids)
def supported_hashlib_name(self, name):
# Get any easy ones out of the way first.
if name in digest_schemes:
return(name)
2018-09-23 07:55:58 -04:00
# Otherwise grab the first one that matches
_digest_re = re.compile('^{0}$'.format(name.strip()), re.IGNORECASE)
for h in digest_schemes:
if _digest_re.search(h):
return(h)
return(None)
class generate(object):
def __init__(self):
pass
def hash_password(self, password, salt = None, algo = crypt.METHOD_SHA512):
if not salt or salt == 'auto':
_salt = crypt.mksalt(algo)
else:
_salt = salt
2018-05-22 06:01:18 -04:00
if not password:
# Intentionally empty password.
return('')
return(crypt.crypt(password, _salt))
def hashlib_names(self):
hashes = []
for h in sorted(digest_schemes):
r = re.compile('^{0}$'.format(h), re.IGNORECASE)
if len([i for i in filter(r.search, hashes)]) == 0:
hashes.append(h)
return(hashes)
2018-05-22 06:01:18 -04:00
def salt(self, algo = None):
if not algo:
algo = 'sha512'
algo = crypt_map[algo]
return(crypt.mksalt(algo))
class prompts(object):
def __init__(self):
2018-05-21 05:58:25 -04:00
self.promptstr = prompt_strings.PromptStrings()
# TODO: these strings should be indexed in a separate module and
# sourced/imported. we should generally just find a cleaner way to do this.
def confirm_or_no(self, prompt = '', invert = False,
usage = '{0} to confirm, otherwise {1}...\n'):
2018-05-18 08:13:46 -04:00
# A simplified version of multiline_input(), really.
# By default, Enter confirms (and returns True) and CTRL-d returns
# False unless - you guessed it - invert is True.
# usage is a string appended to prompt that explains which keys to use.
# It accepts two strformats: 0 is the EOF keystroke, and 1 is the Enter
# key (those are flipped if invert = True).
_enter_ks = 'Enter/Return'
if os.name == 'posix':
_ks = 'CTRL-d'
else: # What does os.name == "java" use?
_ks = 'CTRL-z'
if invert:
_usage = usage.format(_ks, _enter_ks)
else:
_usage = usage.format(_enter_ks, _ks)
try:
if usage:
input(prompt + _usage)
else:
input(prompt)
except EOFError:
if invert:
return(True)
else:
return(False)
return(True)
2018-05-21 05:58:25 -04:00
def gpg_keygen_attribs(self):
_strs = self.promptstr.gpg
gpg_vals = {'attribs': {},
'params': {}}
_checks = {
'params': {
'name': {'error': 'name cannot be empty',
'check': valid().nonempty_str},
'email': {'error': 'not a valid email address',
'check': valid().email}
}
}
for a in _strs['attribs']:
_a = None
while not _a:
if 'algo' in gpg_vals['attribs'] and a == 'keysize':
_algo = gpg_vals['attribs']['algo']
_choices = _strs['attribs']['keysize']['choices'][_algo]
_dflt = _strs['attribs']['keysize']['default'][_algo]
else:
_choices = _strs['attribs'][a]['choices']
_dflt = _strs['attribs'][a]['default']
_a = (input(
('\nWhat should be {0}? (Default is {1}.)\nChoices:\n'
'\n\t{2}\n\n{3}: ').format(
_strs['attribs'][a]['text'],
_dflt,
'\n\t'.join(_choices),
a.title()
)
)).strip().lower()
if _a == '':
_a = _dflt
elif _a not in _choices:
print(
('Invalid selection; choosing default '
'({0})').format(_dflt)
)
_a = _dflt
gpg_vals['attribs'][a] = _a
for p in _strs['params']:
_p = input(
('\nWhat is the {0} for the subkey?\n'
'{1}: ').format(p, p.title())
)
if p in _checks['params']:
while not _checks['params'][p]['check'](_p):
print(
('Invalid entry ({0}). Please retry.').format(
_checks['params'][p]['error']
)
)
_p = input('{0}: '.format(_p.title()))
gpg_vals['params'][p] = _p
return(gpg_vals)
def hash_select(self, prompt = '',
hash_types = generate().hashlib_names()):
_hash_types = hash_types
_hash_str = '\n\t'.join(
['{0}: {1}'.format(idx, val) for idx, val in enumerate(_hash_types,
1)
])
prompt = prompt.format(_hash_str)
_hash_select = (input(prompt)).strip()
if not valid().integer(_hash_select):
return(False)
try:
_hash_select = _hash_types[int(_hash_select) - 1]
except IndexError:
return(None)
return(_hash_select)
def multiline_input(self, prompt = None, continue_str = '> ',
end_str = '\n(End signal received)'):
_lines = []
if prompt:
# This grabs the first CR/LF.
_lines.append(input(prompt))
try:
while True:
if continue_str:
_lines.append(input(continue_str))
else:
_lines.append(input())
except EOFError:
if end_str:
print(end_str)
return('\n'.join(_lines))
2018-05-18 08:13:46 -04:00
def path(self, path_desc, empty_passthru = False):
path = input(('\nWhere would you like to put {0}?\n'
'Path: ').format(path_desc))
2018-05-18 08:13:46 -04:00
if empty_passthru:
if path.strip() == '':
return('')
path = transform().full_path(path)
return(path)
2018-05-18 08:13:46 -04:00
def ssl_object(self, pki_role, cn_url):
ssl_vals = {'paths': {},
'attribs': {},
'subject': {}}
2018-05-21 05:58:25 -04:00
_checks = {
'subject': {
'countryName': valid().country_abbrev,
'emailAddress': valid().email
}
}
_strs = copy.deepcopy(self.promptstr.ssl)
2018-05-18 08:13:46 -04:00
# pki_role should be 'ca' or 'client'
if pki_role not in ('ca', 'client'):
raise ValueError('pki_role must be either "ca" or "client"')
2018-05-21 05:58:25 -04:00
# NOTE: need to validate US and email
2018-05-18 08:13:46 -04:00
if pki_role == 'ca':
2018-05-21 05:58:25 -04:00
# this is getting triggered for clients?
_strs['paths'].update(_strs['paths_ca'])
for a in _strs['attribs']:
2018-05-18 08:13:46 -04:00
ssl_vals['attribs'][a] = {}
2018-05-21 05:58:25 -04:00
for x in _strs['attribs'][a]:
2018-05-18 08:13:46 -04:00
ssl_vals['attribs'][a][x] = None
2018-05-21 05:58:25 -04:00
for p in _strs['paths']:
2018-05-18 08:13:46 -04:00
if p == 'csr':
_allow_empty = True
else:
_allow_empty = False
2018-05-21 05:58:25 -04:00
ssl_vals['paths'][p] = self.path(_strs['paths'][p],
2018-05-18 08:13:46 -04:00
empty_passthru = _allow_empty)
print()
if ssl_vals['paths'][p] == '':
ssl_vals['paths'][p] = None
2018-05-21 05:58:25 -04:00
if p in _strs['attribs']:
for x in _strs['attribs'][p]:
2018-05-18 08:13:46 -04:00
while not ssl_vals['attribs'][p][x]:
2018-05-21 05:58:25 -04:00
# cipher attrib is prompted for before this.
if p == 'key' and x == 'passphrase':
if ssl_vals['attribs']['key']['cipher'] == 'none':
ssl_vals['attribs'][p][x] = 'none'
continue
ssl_vals['attribs'][p][x] = getpass.getpass(
('{0}\n{1}').format(
_strs['attribs'][p][x]['text'],
_strs['attribs'][p][x]['prompt'])
)
if ssl_vals['attribs'][p][x] == '':
ssl_vals['attribs'][p][x] = 'none'
else:
ssl_vals['attribs'][p][x] = (input(
('\n{0}\n\n\t{1}\n\n{2}').format(
_strs['attribs'][p][x]['text'],
'\n\t'.join(
_strs['attribs'][p][x]['options']),
_strs['attribs'][p][x]['prompt']))
).strip().lower()
if ssl_vals['attribs'][p][x] not in \
_strs['attribs'][p][x]['options']:
print(
('\nInvalid selection; setting default '
'({0}).').format(
_strs['attribs'][p][x]['default']
)
)
ssl_vals['attribs'][p][x] = \
_strs['attribs'][p][x]['default']
for s in _strs['subject']:
2018-05-18 08:13:46 -04:00
ssl_vals['subject'][s] = None
2018-05-21 05:58:25 -04:00
for s in _strs['subject']:
2018-05-18 08:13:46 -04:00
while not ssl_vals['subject'][s]:
_input = (input(
2018-05-21 05:58:25 -04:00
('\nWhat is {0}').format(
_strs['subject'][s]['text'])
2018-05-18 08:13:46 -04:00
)).strip()
print()
2018-05-21 05:58:25 -04:00
if s in _checks['subject']:
if not _checks['subject'][s](_input):
print('Invalid entry; try again.')
ssl_vals['subject'][s] = None
continue
2018-05-18 08:13:46 -04:00
ssl_vals['subject'][s] = _input
_url = transform().url_to_dict(cn_url, no_None = True)
ssl_vals['subject']['commonName'] = _url['host']
if pki_role == 'client':
ssl_vals['subject']['commonName'] += ' (Client)'
return(ssl_vals)
class transform(object):
def __init__(self):
pass
def flatten_recurse(self, obj, values = []):
_values = values
if isinstance(obj, list):
_values += obj
elif isinstance(obj, str):
_values.append(obj)
elif isinstance(obj, dict):
for k in obj:
self.flatten_recurse(obj[k], values = _values)
return(_values)
def no_newlines(self, text_in):
text = re.sub('\n+', ' ', text_in)
return(text)
def full_path(self, path):
path = os.path.expanduser(path)
path = os.path.abspath(path)
return(path)
def py2xml(self, value, attrib = True):
if value in (False, ''):
if attrib:
2018-05-24 08:24:46 -04:00
return("false")
else:
return(None)
elif isinstance(value, bool):
# We handle the False case above.
2018-05-24 08:24:46 -04:00
return("true")
elif isinstance(value, str):
return(value)
else:
# We can't do it simply.
return(value)
def sanitize_input(self, text_in, no_underscores = False):
if no_underscores:
_ws_repl = ''
else:
_ws_repl = '_'
# First we convert spaces to underscores (or remove them entirely).
text_out = re.sub('\s+', _ws_repl, text_in.strip())
# Then just strip out all symbols.
text_out = re.sub('[^\w]', '', text_out)
return(text_out)
def url_to_dict(self, orig_url, no_None = False):
def _getuserinfo(uinfo_str):
if len(uinfo_str) == 0:
if no_None:
return('')
else:
return(None)
else:
uinfo_str = uinfo_str[0]
_l = [i.strip() for i in uinfo_str.split(':') if i.strip() != '']
if len(_l) == 1:
_l.append('')
elif len(_l) == 0:
if no_None:
return('')
else:
return(None)
uinfo = {}
if not no_None:
uinfo['user'] = (None if _l[0] == '' else _l[0])
uinfo['password'] = (None if _l[1] == '' else _l[1])
else:
uinfo['user'] = _l[0]
uinfo['password'] = _l[1]
return(uinfo)
def _getdfltport():
with open('/etc/services', 'r') as f:
_svcs = f.read()
_svcs = [i.strip() for i in _svcs.splitlines() if i.strip() != '']
svcs = {}
for x in _svcs:
if re.search('^\s*#', x):
continue
s = re.sub('^\s*(\w\s+\w)(\s|\s*#)*.*$', '\g<1>', x)
l = [i.strip() for i in s.split()]
p = (int(l[1].split('/')[0]), l[1].split('/')[1])
if l[0] not in svcs:
svcs[l[0]] = []
if len(svcs[l[0]]) > 0:
# If it has a TCP port, put that first.
for idx, val in enumerate(svcs[l[0]]):
if val['proto'].lower() == 'tcp':
svcs[l[0]].insert(0, svcs[l[0]].pop(idx))
svcs[l[0]].append({'port': p[0],
'proto': p[1]})
return(svcs)
def _subsplitter(in_str, split_char):
if in_str == '':
if not no_None:
return(None)
else:
return('')
2018-05-18 08:13:46 -04:00
_params = {}
for i in in_str.split(split_char):
p = [x.strip() for x in i.split('=')]
2018-05-18 08:13:46 -04:00
_params[p[0]] = p[1]
if not _params:
if not no_None:
return(None)
else:
return('')
2018-05-18 08:13:46 -04:00
if not _params and not no_None:
return(None)
2018-05-18 08:13:46 -04:00
return(_params)
_dflt_ports = _getdfltport()
scheme = None
2018-05-18 08:13:46 -04:00
_scheme_re = re.compile('^([\w+.-]+)(://.*)', re.IGNORECASE)
if not _scheme_re.search(orig_url):
# They probably didn't prefix a URI signifier (RFC3986 § 3.1).
# We'll add one for them.
2018-05-18 08:13:46 -04:00
url = 'http://' + orig_url
scheme = 'http'
else:
# urlparse's .scheme? Total trash.
url = orig_url
scheme = _scheme_re.sub('\g<1>', orig_url)
url_split = urlparse(url)
# Get any userinfo present.
_auth = url_split.netloc.split('@')[:-1]
userinfo = _getuserinfo(_auth)
# Get any port specified (and parse the host at the same time).
if userinfo:
_h_split = url_split.netloc('@')[-1]
else:
_h_split = url_split.netloc
_nl_split = _h_split.split(':')
if len(_nl_split) > 1:
if userinfo in (None, ''):
port = int(_nl_split[1])
host = _nl_split[0]
else:
port = int(_nl_split[-1])
host = _nl_split[-2]
else:
if scheme in _dflt_ports:
port = _dflt_ports[scheme][0]['port']
else:
if not no_None:
port = None
else:
''
host = _nl_split[0]
# Split out the params, queries, fragments.
params = _subsplitter(url_split.params, ';')
queries = _subsplitter(url_split.query, '?')
fragments = _subsplitter(url_split.fragment, '#')
if url_split.path == '':
path = '/'
else:
path = os.path.dirname(url_split.path)
_dest = os.path.basename(url_split.path)
if not no_None:
dest = (None if _dest == '' else _dest)
else:
dest = _dest
url = {'scheme': scheme,
'auth': userinfo,
'host': host,
'port': port,
'path': path,
'dest': dest,
'params': params,
'queries': queries,
'fragments': fragments,
'url': orig_url}
url['full_url'] = '{0}://'.format(scheme)
if userinfo not in (None, ''):
2018-05-18 08:13:46 -04:00
url['full_url'] += '{user}:{password}@'.format(**userinfo)
url['full_url'] += host
if port not in (None, ''):
url['full_url'] += ':{0}'.format(port)
url['full_url'] += '/'.join((path, dest))
# Do these need to be in a specific order?
if params not in (None, ''):
_p = ['{0}={1}'.format(k, v) for k, v in params.items()]
url['full_url'] += ';{0}'.format(';'.join(_p))
if queries not in (None, ''):
_q = ['{0}={1}'.format(k, v) for k, v in queries.items()]
url['full_url'] += '?{0}'.format('?'.join(_q))
if fragments not in (None, ''):
_f = ['{0}={1}'.format(k, v) for k, v in fragments.items()]
url['full_url'] += '#{0}'.format('#'.join(_f))
return(url)
2018-05-22 06:01:18 -04:00
def user(self, user_elem):
_attribs = ('hashed', 'hash_algo', 'salt')
acct = {}
for a in _attribs:
acct[a] = None
if len(user_elem):
elem = user_elem[0]
for a in _attribs:
if a in elem.attrib:
acct[a] = self.xml2py(elem.attrib[a], attrib = True)
if acct['hashed']:
if not acct['hash_algo']:
_hash = detect().password_hash(elem.text)
if _hash:
acct['hash_algo'] = _hash
else:
acct['hash_algo'] = None
# We no longer raise ValueError. Per shadow(5):
#######################################################
# If the password field contains some string that is
# not a valid result of crypt(3), for instance ! or *,
# the user will not be able to use a unix password to
# log in (but the user may log in the system by other
# means).
# This field may be empty, in which case no passwords
# are required to authenticate as the specified login
# name. However, some applications which read the
# /etc/shadow file may decide not to permit any access
# at all if the password field is empty.
# A password field which starts with an exclamation
# mark means that the password is locked. The remaining
# characters on the line represent the password field
# before the password was locked.
#######################################################
# raise ValueError(
# 'Invalid salted password hash: {0}'.format(
# elem.text)
# )
2018-05-22 06:01:18 -04:00
acct['salt_hash'] = elem.text
acct['passphrase'] = None
else:
if not acct['hash_algo']:
acct['hash_algo'] = 'sha512'
acct['passphrase'] = elem.text
_saltre = re.compile('^\s*(auto|none|)\s*$', re.IGNORECASE)
if acct['salt']:
if _saltre.search(acct['salt']):
_salt = generate.salt(acct['hash_algo'])
acct['salt'] = _salt
else:
if not acct['hashed']:
acct['salt_hash'] = generate().hash_password(
acct['passphrase'],
algo = crypt_map[acct['hash_algo']])
acct['salt'] = detect().password_hash_salt(acct['salt_hash'])
if 'salt_hash' not in acct:
acct['salt_hash'] = generate().hash_password(
acct['passphrase'],
salt = acct['salt'],
algo = crypt_map[acct['hash_algo']])
return(acct)
def xml2py(self, value, attrib = True):
2018-05-24 08:24:46 -04:00
yes = re.compile('^\s*(true|1)\s*$', re.IGNORECASE)
no = re.compile('^\s*(false|0)\s*$', re.IGNORECASE)
2018-05-22 07:19:49 -04:00
none = re.compile('^\s*(none|)\s*$', re.IGNORECASE)
2018-05-22 06:01:18 -04:00
if no.search(value):
if attrib:
return(False)
else:
return(None)
elif yes.search(value):
# We handle the False case above.
return(True)
2018-05-22 07:19:49 -04:00
elif value.strip() == '' or none.search(value):
2018-05-22 06:01:18 -04:00
return(None)
2018-05-22 07:19:49 -04:00
elif valid().integer(value):
return(int(value))
2018-05-22 06:01:18 -04:00
else:
return(value)
2018-05-22 07:19:49 -04:00
return()
2018-05-22 06:01:18 -04:00
class valid(object):
def __init__(self):
pass
2018-05-18 08:13:46 -04:00
def country_abbrev(self, country_code):
if country_code not in iso3166.countries_by_alpha2:
return(False)
return(True)
def dns(self, addr):
pass
def connection(self, conninfo):
# conninfo should ideally be (host, port)
pass
def email(self, addr):
return(
2018-05-18 08:13:46 -04:00
not isinstance(validators.email(emailparse(addr)[1]),
validators.utils.ValidationFailure))
def gpgkeyID(self, key_id):
# Condense fingerprints into normalized 40-char "full" key IDs.
key_id = re.sub('\s+', '', key_id)
_re_str = ('^(0x)?('
'[{HEX}]{{40}}|'
'[{HEX}]{{16}}|'
'[{HEX}]{{8}}'
')$').format(HEX = string.hexdigits)
_key_re = re.compile(_re_str)
if not _key_re.search(key_id):
return(False)
return(True)
def integer(self, num):
try:
int(num)
return(True)
except ValueError:
return(False)
return()
2018-05-21 05:58:25 -04:00
def nonempty_str(self, str_in):
if str_in.strip() == '':
return(False)
return(True)
def password(self, passwd):
# https://en.wikipedia.org/wiki/ASCII#Printable_characters
# https://serverfault.com/a/513243/103116
_chars = ('!"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'[\]^_`abcdefghijklmnopqrstuvwxyz{|}~ ')
for char in passwd:
if char not in _chars:
return(False)
return(True)
def password_hash(self, passwd_hash, algo = None):
# We need to identify the algorithm if it wasn't provided.
if not algo:
# The following are supported on GNU/Linux.
# "des_crypt" is glibc's crypt() (man 3 crypt).
# https://passlib.readthedocs.io/en/stable/lib/passlib.context.html
# Specifically, ...#passlib.context.CryptContext.identify
_ctx = cryptctx(schemes = passlib_schemes)
_algo = _ctx.identify(passwd_hash)
if not _algo:
return(False)
else:
algo = re.sub('_crypt$', '', _algo)
_ctx = cryptctx(schemes = ['{0}_crypt'.format(algo)])
if not _ctx.identify(passwd_hash):
return(False)
return(True)
def salt_hash(self, salthash):
2018-09-23 07:55:58 -04:00
_idents = ''.join([i.ident for i in crypt_map.values() if i.ident])
2018-05-18 08:13:46 -04:00
# noinspection PyStringFormat
2018-09-23 07:55:58 -04:00
_regex = re.compile('^(\$[{0}]\$)?[./0-9A-Za-z]{{0,16}}\$?'.format(_idents))
2018-05-18 08:13:46 -04:00
if not _regex.search(salthash):
return(False)
return(True)
2018-09-23 07:55:58 -04:00
def salt_hash_full(self, salthash, hash_type):
h = [re.sub('-', '', i.lower()).split()[0] for i in detect.any_hash(self, salthash, normalize = True)]
if hash_type.lower() not in h:
return(False)
return(True)
def plugin_name(self, name):
if len(name) == 0:
return(False)
_name_re = re.compile('^[a-z][0-9a-z_]+$', re.IGNORECASE)
if not _name_re.search(name):
return(False)
return(True)
def posix_filename(self, fname):
# Note: 2009 spec of POSIX, "3.282 Portable Filename Character Set"
if len(fname) == 0:
return(False)
_char_re = re.compile('^[a-z0-9._-]+$', re.IGNORECASE)
if not _char_re.search(fname):
return(False)
return(True)
def url(self, url):
2018-05-18 08:13:46 -04:00
if not re.search('^[\w+.-]+://', url):
# They probably didn't prefix a URI signifier (RFC3986 § 3.1).
# We'll add one for them.
url = 'http://' + url
if isinstance(validators.url(url), validators.utils.ValidationFailure):
return(False)
else:
return(True)
return()
def username(self, uname):
# https://unix.stackexchange.com/a/435120/284004
_regex = re.compile('^[a-z_]([a-z0-9_-]{0,31}|[a-z0-9_-]{0,30}\$)$')
if not _regex.search(uname):
return(False)
return(True)
def uuid(self, uuid_str):
is_uuid = True
try:
2018-05-18 08:13:46 -04:00
u = uuid.UUID(uuid_str)
except ValueError:
return(False)
2018-05-18 08:13:46 -04:00
if not uuid_str == str(u):
return(False)
return(is_uuid)
2018-05-18 08:13:46 -04:00
class xml_supplicant(object):
def __init__(self, cfg, profile = None, max_recurse = 5):
2018-05-22 06:01:18 -04:00
self.selector_ids = ('id', 'name', 'uuid')
2018-05-18 08:13:46 -04:00
self.btags = {'xpath': {},
'regex': {},
'variable': {}}
2018-05-22 06:01:18 -04:00
raw = self._detect_cfg(cfg)
# This is changed in just a moment.
self.profile = profile
# This is retained so we can "refresh" the profile if needed.
self.orig_profile = profile
try:
2018-05-24 08:24:46 -04:00
self.orig_xml = lxml.etree.fromstring(raw)
# We need to strip the naked namespace for XPath to work.
self.xml = copy.deepcopy(self.orig_xml)
self.roottree = self.xml.getroottree()
self.tree = self.roottree.getroot()
self.strip_naked_ns()
2018-05-22 06:01:18 -04:00
except lxml.etree.XMLSyntaxError:
raise ValueError('The configuration provided does not seem to be '
'valid')
self.get_profile(profile = profile)
2018-05-24 08:24:46 -04:00
# This is disabled; we set it above.
#self.xml = lxml.etree.fromstring(raw)
2018-05-18 08:13:46 -04:00
self.fmt = XPathFmt()
2018-05-22 06:01:18 -04:00
self.max_recurse = int(self.profile.xpath(
'//meta/max_recurse/text()')[0])
2018-05-18 08:13:46 -04:00
# I don't have permission to credit them, but to the person who helped
# me with this regex - thank you. You know who you are.
2018-05-22 06:01:18 -04:00
# Originally this pattern was the one from:
# https://stackoverflow.com/a/12728199/733214
2018-05-18 08:13:46 -04:00
self.ptrn = re.compile(('(?<=(?<!\{)\{)(?:[^{}]+'
'|{{[^{}]*}})*(?=\}(?!\}))'))
2018-05-22 06:01:18 -04:00
self.root = lxml.etree.ElementTree(self.xml)
2018-05-18 08:13:46 -04:00
self._parse_regexes()
self._parse_variables()
def _detect_cfg(self, cfg):
if isinstance(cfg, str):
try:
lxml.etree.fromstring(cfg.encode('utf-8'))
return(cfg.encode('utf-8'))
except lxml.etree.XMLSyntaxError:
path = os.path.abspath(os.path.expanduser(cfg))
try:
with open(path, 'rb') as f:
cfg = f.read()
except FileNotFoundError:
raise ValueError('Could not open {0}'.format(path))
elif isinstance(cfg, _io.TextIOWrapper):
_cfg = cfg.read().encode('utf-8')
cfg.close()
cfg = _cfg
elif isinstance(cfg, _io.BufferedReader):
_cfg = cfg.read()
cfg.close()
cfg = _cfg
elif isinstance(cfg, lxml.etree._Element):
return(lxml.etree.tostring(cfg))
elif isinstance(cfg, bytes):
return(cfg)
else:
raise TypeError('Could not determine the object type.')
return(cfg)
2018-05-22 06:01:18 -04:00
2018-05-18 08:13:46 -04:00
def _parse_regexes(self):
for regex in self.profile.xpath('./meta/regexes/pattern'):
2018-05-22 06:01:18 -04:00
_key = 'regex%{0}'.format(regex.attrib['id'])
self.btags['regex'][_key] = regex.text
2018-05-18 08:13:46 -04:00
return()
def _parse_variables(self):
for variable in self.profile.xpath('./meta/variables/variable'):
2018-05-18 08:13:46 -04:00
self.btags['variable'][
'variable%{0}'.format(variable.attrib['id'])
] = variable.text
return()
2018-05-22 06:01:18 -04:00
def btags_to_dict(self, text_in):
d = {}
ptrn_id = self.ptrn.findall(text_in)
if len(ptrn_id) >= 1:
for item in ptrn_id:
try:
btag, expr = item.split('%', 1)
if btag not in self.btags:
continue
if item not in self.btags[btag]:
self.btags[btag][item] = None
#self.btags[btag][item] = expr # remove me?
if btag == 'xpath':
d[item] = (btag, expr)
elif btag == 'variable':
d[item] = (btag, self.btags['variable'][item])
except ValueError:
return(d)
return(d)
def get_profile(self, profile = None):
"""Get a configuration profile.
Get a configuration profile from the XML object and set that as a
profile object. If a profile is specified, attempt to find it. If not,
follow the default rules as specified in __init__.
"""
if profile:
# A profile identifier was provided
if isinstance(profile, str):
_profile_name = profile
profile = {}
for i in self.selector_ids:
profile[i] = None
profile['name'] = _profile_name
elif isinstance(profile, dict):
for k in self.selector_ids:
if k not in profile.keys():
profile[k] = None
else:
raise TypeError('profile must be a string (name of profile), '
'a dictionary, or None')
xpath = '/bdisk/profile{0}'.format(self.xpath_selector(profile))
self.profile = self.xml.xpath(xpath)
if len(self.profile) != 1:
raise ValueError('Could not determine a valid, unique '
'profile; please check your profile '
'specifier(s)')
else:
# We need the actual *profile*, not a list of matching
# profile(s).
self.profile = self.profile[0]
if not len(self.profile):
raise RuntimeError('Could not find the profile specified in '
'the given configuration')
else:
# We need to find the default.
profiles = []
for p in self.xml.xpath('/bdisk/profile'):
profiles.append(p)
# Look for one named "default" or "DEFAULT" etc.
for idx, value in enumerate([e.attrib['name'].lower() \
for e in profiles]):
if value == 'default':
#self.profile = copy.deepcopy(profiles[idx])
self.profile = profiles[idx]
break
# We couldn't find a profile with a default name. Try to grab the
# first profile.
if self.profile is None:
# Grab the first profile.
if profiles:
self.profile = profiles[0]
else:
# No profiles found.
raise RuntimeError('Could not find any usable '
'configuration profiles')
return()
2018-05-18 08:13:46 -04:00
def get_path(self, element):
path = element
try:
path = self.root.getpath(element)
except ValueError:
raise ValueError(
(
'Could not find a path for the expression {0}'
).format(element.text))
return(path)
def return_full(self):
2018-05-26 09:11:07 -04:00
# https://stackoverflow.com/a/22553145/733214
local_xml = lxml.etree.Element('bdisk',
nsmap = self.orig_xml.nsmap,
attrib = self.orig_xml.attrib)
local_xml.text = '\n '
for elem in self.xml.xpath('/bdisk/profile'):
local_xml.append(copy.deepcopy(elem))
2018-05-26 09:11:07 -04:00
return(lxml.etree.tostring(local_xml))
2018-05-24 08:24:46 -04:00
def return_naked_ns(self):
# It's so stupid I have to do this.
return(self.orig_xml.nsmap)
def strip_naked_ns(self):
# I cannot *believe* that LXML doesn't have this built-in, considering
# how common naked namespaces are.
# https://stackoverflow.com/a/18160164/733214
for elem in self.roottree.getiterator():
if not hasattr(elem.tag, 'find'):
continue
i = elem.tag.find('}')
if i >= 0:
elem.tag = elem.tag[i + 1:]
lxml.objectify.deannotate(self.roottree, cleanup_namespaces = True)
return()
2018-05-18 08:13:46 -04:00
def substitute(self, element, recurse_count = 0):
if recurse_count >= self.max_recurse:
return(element)
if isinstance(element, lxml.etree._Element):
if element.tag == 'regex':
return(element)
if isinstance(element, lxml.etree._Comment):
return(element)
if element.text:
_dictmap = self.btags_to_dict(element.text)
for elem in _dictmap:
# This is needed because _dictmap gets replaced below
if not _dictmap:
return(element)
_btag, _value = _dictmap[elem]
if isinstance(_value, str):
if _btag == 'xpath':
try:
newpath = element.xpath(_dictmap[elem][1])
except (AttributeError, IndexError, TypeError):
newpath = element
except lxml.etree.XPathEvalError:
return(element)
try:
self.btags['xpath'][elem] = self.substitute(
newpath, (recurse_count + 1))[0]
except (IndexError, TypeError):
raise ValueError(
('Encountered an error while trying to '
'substitute {0} at {1}').format(
elem, self.get_path(element)
))
element.text = self.fmt.vformat(
element.text,
[],
{**self.btags['xpath'],
**self.btags['variable']})
_dictmap = self.btags_to_dict(element.text)
return(element)
2018-05-22 06:01:18 -04:00
def xpath_selector(self, selectors):
2018-05-18 08:13:46 -04:00
# selectors is a dict of {attrib:value}
xpath = ''
for i in selectors.items():
2018-05-22 06:01:18 -04:00
if i[1] and i[0] in self.selector_ids:
2018-05-18 08:13:46 -04:00
xpath += '[@{0}="{1}"]'.format(*i)
2018-09-23 07:55:58 -04:00
return(xpath)