bdisk/bdisk/confparse.py

394 lines
17 KiB
Python

import copy
import os
import pprint
import re
import lxml.etree
from urllib.parse import urlparse
import utils # LOCAL
etree = lxml.etree
detect = utils.detect()
generate = utils.generate()
transform = utils.transform()
valid = utils.valid()
class Conf(object):
def __init__(self, cfg, profile = None, validate_cfg = False,
xsd_file = None):
"""
A configuration object.
Read a configuration file, parse it, and make it available to the rest
of BDisk.
Args:
cfg The configuration. Can be a filesystem path, a string,
bytes, or a stream. If bytes or a bytestream, it must be
in UTF-8 format.
profile (optional) A sub-profile in the configuration. If None
is provided, we'll first look for the first profile
named 'default' (case-insensitive). If one isn't found,
then the first profile found will be used. Can be a
string (in which we'll automatically search for the
given value in the "name" attribute) or a dict for more
fine-grained profile identification, such as:
{'name': 'PROFILE_NAME',
'id': 1,
'uuid': '00000000-0000-0000-0000-000000000000'}
You can provide any combination of these
(e.g. "profile={'id': 2, 'name' = 'some_profile'}").
Non-greedy matching (meaning ALL attributes specified
must match).
"""
if validate_cfg == 'pre':
# Validate before attempting any other operations
self.validate()
self.xml_suppl = utils.xml_supplicant(cfg, profile = profile)
self.xml = self.xml_suppl.xml
for e in self.xml_suppl.xml.iter():
self.xml_suppl.substitute(e)
self.xml_suppl.get_profile(profile = self.xml_suppl.orig_profile)
with open('/tmp/parsed.xml', 'wb') as f:
f.write(lxml.etree.tostring(self.xml_suppl.xml))
self.profile = self.xml_suppl.profile
self.xsd = xsd_file
self.cfg = {}
if validate_cfg:
# Validation post-substitution
self.validate(parsed = False)
# TODO: populate checksum{} with hash_algo if explicit
def get_pki_obj(self, pki, pki_type):
elem = {}
if pki_type not in ('ca', 'client'):
raise ValueError('pki_type must be "ca" or "client"')
if pki_type == 'ca':
elem['index'] = None
elem['serial'] = None
for e in pki.xpath('./*'):
# These have attribs or children.
if e.tag in ('cert', 'key', 'subject'):
elem[e.tag] = {}
if e.tag == 'subject':
for sub in e.xpath('./*'):
elem[e.tag][sub.tag] = transform.xml2py(sub.text,
attrib = False)
else:
for a in e.xpath('./@*'):
elem[e.tag][a.attrname] = transform.xml2py(a)
elem[e.tag]['path'] = e.text
else:
elem[e.tag] = e.text
return(elem)
def get_source(self, source, item, _source):
_source_item = {'flags': [], 'fname': None}
elem = source.xpath('./{0}'.format(item))[0]
if item == 'checksum':
if elem.get('explicit', False):
_explicit = transform.xml2py(
elem.attrib['explicit'])
_source_item['explicit'] = _explicit
if _explicit:
del(_source_item['fname'])
_source_item['value'] = elem.text
return(_source_item)
else:
_source_item['explicit'] = False
if elem.get('hash_algo', False):
_source_item['hash_algo'] = elem.attrib['hash_algo']
else:
_source_item['hash_algo'] = None
if item == 'sig':
if elem.get('keys', False):
_keys = [i.strip() for i in elem.attrib['keys'].split()]
_source_item['keys'] = _keys
else:
_source_item['keys'] = []
if elem.get('keyserver', False):
_source_item['keyserver'] = elem.attrib['keyserver']
else:
_source_item['keyserver'] = None
_item = elem.text
_flags = elem.get('flags', '')
if _flags:
for f in _flags.split():
if f.strip().lower() == 'none':
continue
_source_item['flags'].append(f.strip().lower())
if _source_item['flags']:
if 'regex' in _source_item['flags']:
ptrn = _item.format(**self.xml_suppl.btags['regex'])
else:
ptrn = None
# TODO: remove all this shit! we switch to just a mirror url.
_source_item['fname'] = detect.remote_files(
'/'.join((_source['mirror'],
_source['rootpath'])),
ptrn = ptrn,
flags = _source_item['flags'])
else:
_source_item['fname'] = _item
return(_source_item)
def get_xsd(self):
if isinstance(self.xsd, lxml.etree.XMLSchema):
return(self.xsd)
if not self.xsd:
path = os.path.join(os.path.dirname(__file__), 'bdisk.xsd')
else:
path = os.path.abspath(os.path.expanduser(self.xsd))
with open(path, 'rb') as f:
xsd = lxml.etree.parse(f)
return(xsd)
def parse_accounts(self):
## PROFILE/ACCOUNTS
self.cfg['users'] = []
# First we handle the root user, since it's a "special" case.
_root = self.profile.xpath('./accounts/rootpass')
self.cfg['root'] = transform.user(_root)
for user in self.profile.xpath('./accounts/user'):
_user = {'username': user.xpath('./username/text()')[0],
'sudo': transform.xml2py(user.attrib['sudo']),
'comment': None}
_comment = user.xpath('./comment/text()')
if len(_comment):
_user['comment'] = _comment[0]
_password = user.xpath('./password')
_user.update(transform.user(_password))
self.cfg['users'].append(_user)
return()
def parse_all(self):
self.parse_profile()
self.parse_meta()
self.parse_accounts()
self.parse_sources()
self.parse_buildpaths()
self.parse_pki()
self.parse_gpg()
self.parse_sync()
return()
def parse_buildpaths(self):
## PROFILE/BUILD(/PATHS)
self.cfg['build'] = {'paths': {}}
build = self.profile.xpath('./build')[0]
_optimize = build.get('its_full_of_stars', 'false')
self.cfg['build']['optimize'] = transform.xml2py(_optimize)
for path in build.xpath('./paths/*'):
self.cfg['build']['paths'][path.tag] = path.text
self.cfg['build']['guests'] = build.get('guests', 'archlinux')
# iso and ipxe are their own basic profile elements, but we group them
# in here because 1.) they're related, and 2.) they're simple to
# import. This may change in the future if they become more complex.
## PROFILE/ISO
self.cfg['iso'] = {'sign': None,
'multi_arch': None}
self.cfg['ipxe'] = {'sign': None,
'iso': None}
for x in ('iso', 'ipxe'):
# We enable all features by default.
elem = self.profile.xpath('./{0}'.format(x))[0]
for a in self.cfg[x]:
self.cfg[x][a] = transform.xml2py(elem.get(a, 'true'))
if x == 'ipxe':
self.cfg[x]['uri'] = elem.xpath('./uri/text()')[0]
return()
def parse_gpg(self):
## PROFILE/GPG
self.cfg['gpg'] = {'keyid': None,
'gnupghome': None,
'publish': None,
'prompt_passphrase': None,
'keys': []}
elem = self.profile.xpath('./gpg')[0]
for attr in elem.xpath('./@*'):
self.cfg['gpg'][attr.attrname] = transform.xml2py(attr)
for key in elem.xpath('./key'):
_keytpl = {'algo': 'rsa',
'keysize': '4096'}
_key = copy.deepcopy(_keytpl)
_key['name'] = None
_key['email'] = None
_key['comment'] = None
for attr in key.xpath('./@*'):
_key[attr.attrname] = transform.xml2py(attr)
for param in key.xpath('./*'):
if param.tag == 'subkey':
# We only support one subkey (for key generation).
if 'subkey' not in _key:
_key['subkey'] = copy.deepcopy(_keytpl)
for attr in param.xpath('./@*'):
_key['subkey'][attr.attrname] = transform.xml2py(attr)
print(_key)
else:
_key[param.tag] = transform.xml2py(param.text, attrib = False)
self.cfg['gpg']['keys'].append(_key)
return()
def parse_meta(self):
## PROFILE/META
# Get the various meta strings. We skip regexes (we handle those
# separately since they're unique'd per id attrib) and variables (they
# are already substituted by self.xml_suppl.substitute(x)).
_meta_iters = ('dev', 'names')
for t in _meta_iters:
self.cfg[t] = {}
_xpath = './meta/{0}'.format(t)
for e in self.profile.xpath(_xpath):
for se in e:
if not isinstance(se, lxml.etree._Comment):
self.cfg[t][se.tag] = transform.xml2py(se.text,
attrib = False)
for e in ('desc', 'uri', 'ver', 'max_recurse'):
_xpath = './meta/{0}/text()'.format(e)
self.cfg[e] = transform.xml2py(self.profile.xpath(_xpath)[0],
attrib = False)
# HERE is where we would handle regex patterns.
# But we don't, because they're in self.xml_suppl.btags['regex'].
#self.cfg['regexes'] = {}
#_regexes = self.profile.xpath('./meta/regexes/pattern')
#if len(_regexes):
# for ptrn in _regexes:
# self.cfg['regexes'][ptrn.attrib['id']] = re.compile(ptrn.text)
return()
def parse_pki(self):
## PROFILE/PKI
self.cfg['pki'] = {'clients': []}
elem = self.profile.xpath('./pki')[0]
self.cfg['pki']['overwrite'] = transform.xml2py(
elem.get('overwrite', 'false'))
ca = elem.xpath('./ca')[0]
clients = elem.xpath('./client')
self.cfg['pki']['ca'] = self.get_pki_obj(ca, 'ca')
for client in clients:
self.cfg['pki']['clients'].append(self.get_pki_obj(client,
'client'))
return()
def parse_profile(self):
## PROFILE
# The following are attributes of profiles that serve as identifiers.
self.cfg['profile'] = {'id': None,
'name': None,
'uuid': None}
for a in self.cfg['profile']:
if a in self.profile.attrib:
self.cfg['profile'][a] = transform.xml2py(
self.profile.attrib[a],
attrib = True)
# Small bug in transform.xml2py that we unfortunately can't fix, so we manually fix.
if 'id' in self.cfg['profile'] and isinstance(self.cfg['profile']['id'], bool):
self.cfg['profile']['id'] = int(self.cfg['profile']['id'])
return()
def parse_sources(self):
## PROFILE/SOURCES
self.cfg['sources'] = []
for source in self.profile.xpath('./sources/source'):
_source = {}
_source['arch'] = source.attrib['arch']
_source['mirror'] = source.xpath('./mirror/text()')[0]
_source['rootpath'] = source.xpath('./rootpath/text()')[0]
# The tarball, checksum, and sig components requires some...
# special care.
for e in ('tarball', 'checksum', 'sig'):
_source[e] = self.get_source(source, e, _source)
self.cfg['sources'].append(_source)
return()
def parse_sync(self):
## PROFILE/SYNC
self.cfg['sync'] = {}
elem = self.profile.xpath('./sync')[0]
# We populate defaults in case they weren't specified.
for e in ('gpg', 'ipxe', 'iso', 'tftp'):
self.cfg['sync'][e] = {'enabled': False,
'path': None}
sub = elem.xpath('./{0}'.format(e))[0]
for a in sub.xpath('./@*'):
self.cfg['sync'][e][a.attrname] = transform.xml2py(a)
self.cfg['sync'][e]['path'] = sub.text
rsync = elem.xpath('./rsync')[0]
self.cfg['sync']['rsync'] = {'enabled': False}
for a in rsync.xpath('./@*'):
self.cfg['sync']['rsync'][a.attrname] = transform.xml2py(a)
for sub in rsync.xpath('./*'):
self.cfg['sync']['rsync'][sub.tag] = transform.xml2py(
sub.text,
attrib = False)
return()
def validate(self, parsed = False):
xsd = self.get_xsd()
if not isinstance(xsd, lxml.etree.XMLSchema):
self.xsd = etree.XMLSchema(xsd)
else:
pass
# This would return a bool if it validates or not.
#self.xsd.validate(self.xml)
# We want to get a more detailed exception.
xml = etree.fromstring(self.xml_suppl.return_full())
self.xsd.assertValid(xml)
if parsed:
# We wait until after it's parsed to evaluate because otherwise we
# can't use utils.valid().
# We only bother with stuff that would hinder building, though -
# e.g. we don't check that profile's UUID is a valid UUID4.
# The XSD can catch a lot of stuff, but it's not so hot with things like URI validation,
# email validation, etc.
# URLs
for url in (self.cfg['uri'], self.cfg['dev']['website']):
if not valid.url(url):
raise ValueError('{0} is not a valid URL.'.format(url))
# Emails
for k in self.cfg['gpg']['keys']:
if not valid.email(k['email']):
raise ValueError('GPG key {0}: {1} is not a valid email address'.format(k['name'], k['email']))
if not valid.email(self.cfg['dev']['email']):
raise ValueError('{0} is not a valid email address'.format(self.cfg['dev']['email']))
if self.cfg['pki']:
if 'subject' in self.cfg['pki']['ca']:
if not valid.email(self.cfg['pki']['ca']['subject']['emailAddress']):
raise ValueError('{0} is not a valid email address'.format(
self.cfg['pki']['ca']['subject']['emailAddress']))
for cert in self.cfg['pki']['clients']:
if not cert['subject']:
continue
if not valid.email(cert['subject']['emailAddress']):
raise ValueError('{0} is not a valid email address'.format(cert['subject']['email']))
# Salts/hashes
if self.cfg['root']['salt']:
if not valid.salt_hash(self.cfg['root']['salt']):
raise ValueError('{0} is not a valid salt'.format(self.cfg['root']['salt']))
if self.cfg['root']['hashed']:
if not valid.salt_hash_full(self.cfg['root']['salt_hash'], self.cfg['root']['hash_algo']):
raise ValueError('{0} is not a valid hash of type {1}'.format(self.cfg['root']['salt_hash'],
self.cfg['root']['hash_algo']))
for u in self.cfg['users']:
if u['salt']:
if not valid.salt_hash(u['salt']):
raise ValueError('{0} is not a valid salt'.format(u['salt']))
if u['hashed']:
if not valid.salt_hash_full(u['salt_hash'], u['hash_algo']):
raise ValueError('{0} is not a valid hash of type {1}'.format(u['salt_hash'], u['hash_algo']))
# GPG Key IDs
if self.cfg['gpg']['keyid']:
if not valid.gpgkeyID(self.cfg['gpg']['keyid']):
raise ValueError('{0} is not a valid GPG Key ID/fingerprint'.format(self.cfg['gpg']['keyid']))
for s in self.cfg['sources']:
if 'sig' in s:
for k in s['sig']['keys']:
if not valid.gpgkeyID(k):
raise ValueError('{0} is not a valid GPG Key ID/fingerprint'.format(k))
return()