bdisk/bdisk/confparse.py

227 lines
9.0 KiB
Python

import os
import pprint
import re
import utils
import lxml.etree
from urllib.parse import urlparse
etree = lxml.etree
detect = utils.detect()
generate = utils.generate()
transform = utils.transform()
valid = utils.valid()
class Conf(object):
def __init__(self, cfg, profile = None, validate = False):
"""
A configuration object.
Read a configuration file, parse it, and make it available to the rest
of BDisk.
Args:
cfg The configuration. Can be a filesystem path, a string,
bytes, or a stream. If bytes or a bytestream, it must be
in UTF-8 format.
profile (optional) A sub-profile in the configuration. If None
is provided, we'll first look for the first profile
named 'default' (case-insensitive). If one isn't found,
then the first profile found will be used. Can be a
string (in which we'll automatically search for the
given value in the "name" attribute) or a dict for more
fine-grained profile identification, such as:
{'name': 'PROFILE_NAME',
'id': 1,
'uuid': '00000000-0000-0000-0000-000000000000'}
You can provide any combination of these
(e.g. "profile={'id': 2, 'name' = 'some_profile'}").
"""
self.xml_suppl = utils.xml_supplicant(cfg, profile = profile)
self.xml = self.xml_suppl.xml
for e in self.xml_suppl.xml.iter():
self.xml_suppl.substitute(e)
self.xml_suppl.get_profile(profile = self.xml_suppl.orig_profile)
with open('/tmp/parsed.xml', 'wb') as f:
f.write(lxml.etree.tostring(self.xml_suppl.xml))
self.profile = self.xml_suppl.profile
self.xsd = None
self.cfg = {}
#if validate:
#if not self.validate(): # Need to write the XSD
# raise ValueError('The configuration did not pass XSD/schema '
# 'validation')
def get_source(self, source, item, _source):
_source_item = {'flags': [],
'fname': None}
elem = source.xpath('./{0}'.format(item))[0]
if item == 'checksum':
if elem.get('explicit', False):
_explicit = transform.xml2py(
elem.attrib['explicit'])
_source_item['explicit'] = _explicit
if _explicit:
del(_source_item['fname'])
_source_item['value'] = elem.text
return(_source_item)
else:
_source_item['explicit'] = False
if elem.get('hash_algo', False):
_source_item['hash_algo'] = elem.attrib['hash_algo']
else:
_source_item['hash_algo'] = None
if item == 'sig':
if elem.get('keys', False):
_keys = [i.strip() for i in elem.attrib['keys'].split(',')]
_source_item['keys'] = _keys
else:
_source_item['keys'] = []
if elem.get('keyserver', False):
_source_item['keyserver'] = elem.attrib['keyserver']
else:
_source_item['keyserver'] = None
_item = elem.text
_flags = elem.get('flags', [])
if _flags:
for f in _flags.split(','):
if f.strip().lower() == 'none':
continue
_source_item['flags'].append(f.strip().lower())
if _source_item['flags']:
if 'regex' in _source_item['flags']:
ptrn = _item.format(**self.xml_suppl.btags['regex'])
else:
ptrn = None
_source_item['fname'] = detect.remote_files(
'/'.join((_source['mirror'],
_source['rootpath'])),
ptrn = ptrn,
flags = _source_item['flags'])
else:
_source_item['fname'] = _item
return(_source_item)
def get_xsd(self):
path = os.path.join(os.path.dirname(__file__),
'bdisk.xsd')
with open(path, 'r') as f:
xsd = f.read()
return(xsd)
def parse_accounts(self):
## PROFILE/ACCOUNTS
self.cfg['users'] = []
# First we handle the root user, since it's a "special" case.
_root = self.profile.xpath('./accounts/rootpass')
self.cfg['root'] = transform.user(_root)
for user in self.profile.xpath('./accounts/user'):
_user = {'username': user.xpath('./username/text()')[0],
'sudo': transform.xml2py(user.attrib['sudo']),
'comment': None}
_comment = user.xpath('./comment/text()')
if len(_comment):
_user['comment'] = _comment[0]
_password = user.xpath('./password')
_user.update(transform.user(_password))
self.cfg['users'].append(_user)
return()
def parse_all(self):
self.parse_profile()
self.parse_meta()
self.parse_accounts()
self.parse_sources()
self.parse_buildpaths()
self.parse_pki()
return()
def parse_buildpaths(self):
## PROFILE/BUILD(/PATHS)
self.cfg['build'] = {'paths': {}}
build = self.profile.xpath('./build')[0]
_optimize = build.get('its_full_of_stars', 'no')
self.cfg['build']['optimize'] = transform.xml2py(_optimize)
for path in build.xpath('./paths/*'):
self.cfg['build']['paths'][path.tag] = path.text
self.cfg['build']['basedistro'] = build.get('basedistro', 'archlinux')
# iso and ipxe are their own basic profile elements, but we group them
# in here because 1.) they're related, and 2.) they're simple to
# import. This may change in the future if they become more complex.
## PROFILE/ISO
self.cfg['iso'] = {'sign': None,
'multi_arch': None}
self.cfg['ipxe'] = {'sign': None,
'iso': None}
for x in ('iso', 'ipxe'):
# We enable all features by default.
elem = self.profile.xpath('./{0}'.format(x))[0]
for a in self.cfg[x]:
self.cfg[x][a] = transform.xml2py(elem.get(a, 'yes'))
if x == 'ipxe':
self.cfg[x]['uri'] = elem.xpath('./uri/text()')[0]
return()
def parse_meta(self):
## PROFILE/META
# Get the various meta strings. We skip regexes (we handle those
# separately since they're unique'd per id attrib) and variables (they
# are already substituted by self.xml_suppl.substitute(x)).
_meta_iters = ('dev', 'names')
for t in _meta_iters:
self.cfg[t] = {}
_xpath = './meta/{0}'.format(t)
for e in self.profile.xpath(_xpath):
for se in e:
if not isinstance(se, lxml.etree._Comment):
self.cfg[t][se.tag] = se.text
for e in ('desc', 'uri', 'ver', 'max_recurse'):
_xpath = './meta/{0}/text()'.format(e)
self.cfg[e] = self.profile.xpath(_xpath)[0]
# HERE is where we would handle regex patterns.
# But we don't, because they're in self.xml_suppl.btags['regex'].
#self.cfg['regexes'] = {}
#_regexes = self.profile.xpath('./meta/regexes/pattern')
#if len(_regexes):
# for ptrn in _regexes:
# self.cfg['regexes'][ptrn.attrib['id']] = re.compile(ptrn.text)
return()
def parse_pki(self):
self.cfg['pki'] = {'ca': {},
'client': []}
elem = self.profile.xpath('./pki')[0]
self.cfg['pki']['overwrite'] =
def parse_profile(self):
## PROFILE
# The following are attributes of profiles that serve as identifiers.
self.cfg['profile'] = {'id': None,
'name': None,
'uuid': None}
for a in self.cfg['profile']:
if a in self.profile.attrib:
self.cfg['profile'][a] = self.profile.attrib[a]
return()
def parse_sources(self):
## PROFILE/SOURCES
self.cfg['sources'] = []
for source in self.profile.xpath('./sources/source'):
_source = {}
_source['arch'] = source.attrib['arch']
_source['mirror'] = source.xpath('./mirror/text()')[0]
_source['rootpath'] = source.xpath('./rootpath/text()')[0]
# The tarball, checksum, and sig components requires some...
# special care.
for e in ('tarball', 'checksum', 'sig'):
_source[e] = self.get_source(source, e, _source)
self.cfg['sources'].append(_source)
return()
def validate(self):
self.xsd = etree.XMLSchema(self.get_xsd())
return(self.xsd.validate(self.xml))