luks logging done

This commit is contained in:
brent s. 2019-12-22 11:59:49 -05:00
parent a65ef8232a
commit 48ab7f953f
2 changed files with 127 additions and 54 deletions

View File

@ -217,7 +217,6 @@ class LUKS(object):
_logger.error('Secrets must be added before the configuration can be written.')
raise RuntimeError('Missing secrets')
conf = os.path.join(chroot_base, 'etc', 'crypttab')
initconf = '{0}.initramfs'.format(conf)
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
@ -239,5 +238,8 @@ class LUKS(object):
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
if init_hook:
_logger.debug('Symlinked initramfs crypttab.')
os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs'))
_logger.debug('Generated crypttab line: {0}'.format(luksinfo))
return(None)

View File

@ -22,6 +22,7 @@ class LuksSecret(object):
self.passphrase = None
self.size = 4096
self.path = None
_logger.info('Instantiated {0}.'.format(type(self).__name__))


class LuksSecretPassphrase(LuksSecret):
@ -35,6 +36,7 @@ class LuksSecretFile(LuksSecret):
def __init__(self, path, passphrase = None, bytesize = 4096):
super().__init__()
self.path = os.path.realpath(path)
_logger.debug('Path canonized: {0} => {1}'.format(path, self.path))
self.passphrase = passphrase
self.size = bytesize # only used if passphrase == None
self._genSecret()
@ -45,12 +47,14 @@ class LuksSecretFile(LuksSecret):
self.passphrase = secrets.token_bytes(self.size)
if not isinstance(self.passphrase, bytes):
self.passphrase = self.passphrase.encode('utf-8')
_logger.debug('Secret generated.')
return(None)


class LUKS(object):
def __init__(self, luks_xml, partobj):
self.xml = luks_xml
_logger.debug('luks_xml: {0}'.format(etree.tostring(self.xml, with_tail = False).decode('utf-8')))
self.id = self.xml.attrib['id']
self.name = self.xml.attrib['name']
self.device = partobj
@ -62,48 +66,57 @@ class LUKS(object):
block.Partition,
lvm.LV,
mdadm.Array)):
raise ValueError(('partobj must be of type '
'aif.disk.block.Disk, '
'aif.disk.block.Partition, '
'aif.disk.lvm.LV, or'
'aif.disk.mdadm.Array'))
_logger.error(('partobj must be of type '
'aif.disk.block.Disk, '
'aif.disk.block.Partition, '
'aif.disk.lvm.LV, or'
'aif.disk.mdadm.Array.'))
raise ValueError('Invalid partobj type')
self.devpath = '/dev/mapper/{0}'.format(self.name)
self.info = None

def addSecret(self, secretobj):
if not isinstance(secretobj, LuksSecret):
raise ValueError('secretobj must be of type aif.disk.luks.LuksSecret '
'(aif.disk.luks.LuksSecretPassphrase or '
'aif.disk.luks.LuksSecretFile)')
_logger.error('secretobj must be of type '
'aif.disk.luks.LuksSecret '
'(aif.disk.luks.LuksSecretPassphrase or '
'aif.disk.luks.LuksSecretFile).')
raise ValueError('Invalid secretobj type')
self.secrets.append(secretobj)
return(None)

def createSecret(self, secrets_xml = None):
_logger.info('Compiling secrets.')
if not secrets_xml: # Find all of them from self
for secret in self.xml.findall('secrets'):
_logger.debug('No secrets_xml specified; fetching from configuration block.')
for secret_xml in self.xml.findall('secrets'):
_logger.debug('secret_xml: {0}'.format(etree.tostring(secret_xml, with_tail = False).decode('utf-8')))
secretobj = None
secrettypes = set()
for s in secret.iterchildren():
for s in secret_xml.iterchildren():
_logger.debug('secret_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
# This is safe, because a valid config only has at most one of both types.
kf = secret.find('keyFile')
kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text, # path
passphrase = secret.find('passphrase').text,
passphrase = secret_xml.find('passphrase').text,
bytesize = kf.attrib.get('size', 4096)) # TECHNICALLY should be a no-op.
elif 'passphrase' in secrettypes:
secretobj = LuksSecretPassphrase(secret.find('passphrase').text)
secretobj = LuksSecretPassphrase(secret_xml.find('passphrase').text)
elif 'keyFile' in secrettypes:
kf = secret.find('keyFile')
kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text,
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
else:
_logger.debug('A secrets_xml was specified.')
secretobj = None
secrettypes = set()
for s in secrets_xml.iterchildren():
_logger.debug('secrets_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
@ -120,37 +133,63 @@ class LUKS(object):
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
_logger.debug('Secrets compiled.')
return(None)

def create(self):
if self.created:
return(None)
_logger.info('Creating LUKS volume on {0}'.format(self.source))
if not self.secrets:
raise RuntimeError('Cannot create a LUKS volume with no secrets added')
_logger.error('Cannot create a LUKS volume with no secrets added.')
raise RuntimeError('Cannot create a LUKS volume with no secrets')
for idx, secret in enumerate(self.secrets):
if idx == 0:
# TODO: add support for custom parameters for below?
# TODO: logging
cmd = ['cryptsetup',
'--batch-mode',
'luksFormat',
'--type', 'luks2',
'--key-file', '-',
self.source]
subprocess.run(cmd, input = secret.passphrase)
cmd_str = ['cryptsetup',
'--batch-mode',
'luksFormat',
'--type', 'luks2',
'--key-file', '-',
self.source]
cmd = subprocess.run(cmd_str,
input = secret.passphrase,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to encrypt successfully')
else:
# TODO: does the key-file need to be the same path in the installed system?
tmpfile = tempfile.mkstemp()
with open(tmpfile[1], 'wb') as fh:
fh.write(secret.passphrase)
cmd = ['cryptsetup',
'--batch-mode',
'luksAdd',
'--type', 'luks2',
'--key-file', '-',
self.source,
tmpfile[1]]
subprocess.run(cmd, input = self.secrets[0].passphrase)
cmd_str = ['cryptsetup',
'--batch-mode',
'luksAdd',
'--type', 'luks2',
'--key-file', '-',
self.source,
tmpfile[1]]
cmd = subprocess.run(cmd_str,
input = self.secrets[0].passphrase,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)

os.remove(tmpfile[1])
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to encrypt successfully')
self.created = True
return(None)

@ -159,12 +198,19 @@ class LUKS(object):
raise RuntimeError('Cannot lock a LUKS volume before it is created')
if self.locked:
return(None)
# TODO: logging
cmd = ['cryptsetup',
'--batch-mode',
'luksClose',
self.name]
subprocess.run(cmd)
cmd_str = ['cryptsetup',
'--batch-mode',
'luksClose',
self.name]
cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to lock successfully')
self.locked = True
return(None)

@ -173,13 +219,21 @@ class LUKS(object):
raise RuntimeError('Cannot unlock a LUKS volume before it is created')
if not self.locked:
return(None)
cmd = ['cryptsetup',
'--batch-mode',
'luksOpen',
'--key-file', '-',
self.source,
self.name]
subprocess.run(cmd, input = self.secrets[0].passphrase)
cmd_str = ['cryptsetup',
'--batch-mode',
'luksOpen',
'--key-file', '-',
self.source,
self.name]
cmd = subprocess.run(cmd_str, input = self.secrets[0].passphrase)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to unlock successfully')
self.locked = False
return(None)

@ -187,13 +241,23 @@ class LUKS(object):
if self.locked:
raise RuntimeError('Must be unlocked to gather info')
info = {}
cmd = ['cryptsetup',
'--batch-mode',
'luksDump',
self.source]
_info = subprocess.run(cmd, stdout = subprocess.PIPE).stdout.decode('utf-8')
cmd_str = ['cryptsetup',
'--batch-mode',
'luksDump',
self.source]
cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to fetch info successfully')
_info = cmd.stdout.decode('utf-8')
k = None
# I wish there was a better way to do this but I sure as heck am not writing a regex to do it.
# https://gitlab.com/cryptsetup/cryptsetup/issues/511
# https://pypi.org/project/parse/
_tpl = ('LUKS header information\nVersion: {header_ver}\nEpoch: {epoch_ver}\n'
'Metadata area: {metadata_pos} [bytes]\nKeyslots area: {keyslots_pos} [bytes]\n'
@ -225,17 +289,20 @@ class LUKS(object):
if v.lower() == '(no flags)':
v = []
else:
# Space-separated or comma-separated? TODO.
# Is this pace-separated or comma-separated? TODO.
v = [i.strip() for i in v.split() if i.strip() != '']
elif k == 'uuid':
v = uuid.UUID(hex = v)
self.info = info
_logger.debug('Rendered updated info: {0}'.format(self.inf))
return(None)

def writeConf(self, conf = '/etc/crypttab'):
def writeConf(self, chroot_base, init_hook = True):
_logger.info('Generating crypttab.')
if not self.secrets:
raise RuntimeError('secrets must be added before the configuration can be written')
conf = os.path.realpath(conf)
_logger.error('Secrets must be added before the configuration can be written.')
raise RuntimeError('Missing secrets')
conf = os.path.join(chroot_base, 'etc', 'crypttab')
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
@ -257,4 +324,8 @@ class LUKS(object):
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
if init_hook:
_logger.debug('Symlinked initramfs crypttab.')
os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs'))
_logger.debug('Generated crypttab line: {0}'.format(luksinfo))
return(None)