From 48ab7f953ffa0368d688ad28aca8cf635fe7e3f3 Mon Sep 17 00:00:00 2001 From: brent s Date: Sun, 22 Dec 2019 11:59:49 -0500 Subject: [PATCH] luks logging done --- aif/disk/luks.py | 4 +- aif/disk/luks_fallback.py | 177 ++++++++++++++++++++++++++------------ 2 files changed, 127 insertions(+), 54 deletions(-) diff --git a/aif/disk/luks.py b/aif/disk/luks.py index c7c5136..8d974f7 100644 --- a/aif/disk/luks.py +++ b/aif/disk/luks.py @@ -217,7 +217,6 @@ class LUKS(object): _logger.error('Secrets must be added before the configuration can be written.') raise RuntimeError('Missing secrets') conf = os.path.join(chroot_base, 'etc', 'crypttab') - initconf = '{0}.initramfs'.format(conf) with open(conf, 'r') as fh: conflines = fh.read().splitlines() # Get UUID @@ -239,5 +238,8 @@ class LUKS(object): if luksinfo not in conflines: with open(conf, 'a') as fh: fh.write('{0}\n'.format(luksinfo)) + if init_hook: + _logger.debug('Symlinked initramfs crypttab.') + os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs')) _logger.debug('Generated crypttab line: {0}'.format(luksinfo)) return(None) diff --git a/aif/disk/luks_fallback.py b/aif/disk/luks_fallback.py index 49e2a42..7fe893a 100644 --- a/aif/disk/luks_fallback.py +++ b/aif/disk/luks_fallback.py @@ -22,6 +22,7 @@ class LuksSecret(object): self.passphrase = None self.size = 4096 self.path = None + _logger.info('Instantiated {0}.'.format(type(self).__name__)) class LuksSecretPassphrase(LuksSecret): @@ -35,6 +36,7 @@ class LuksSecretFile(LuksSecret): def __init__(self, path, passphrase = None, bytesize = 4096): super().__init__() self.path = os.path.realpath(path) + _logger.debug('Path canonized: {0} => {1}'.format(path, self.path)) self.passphrase = passphrase self.size = bytesize # only used if passphrase == None self._genSecret() @@ -45,12 +47,14 @@ class LuksSecretFile(LuksSecret): self.passphrase = secrets.token_bytes(self.size) if not isinstance(self.passphrase, bytes): self.passphrase = self.passphrase.encode('utf-8') + _logger.debug('Secret generated.') return(None) class LUKS(object): def __init__(self, luks_xml, partobj): self.xml = luks_xml + _logger.debug('luks_xml: {0}'.format(etree.tostring(self.xml, with_tail = False).decode('utf-8'))) self.id = self.xml.attrib['id'] self.name = self.xml.attrib['name'] self.device = partobj @@ -62,48 +66,57 @@ class LUKS(object): block.Partition, lvm.LV, mdadm.Array)): - raise ValueError(('partobj must be of type ' - 'aif.disk.block.Disk, ' - 'aif.disk.block.Partition, ' - 'aif.disk.lvm.LV, or' - 'aif.disk.mdadm.Array')) + _logger.error(('partobj must be of type ' + 'aif.disk.block.Disk, ' + 'aif.disk.block.Partition, ' + 'aif.disk.lvm.LV, or' + 'aif.disk.mdadm.Array.')) + raise ValueError('Invalid partobj type') self.devpath = '/dev/mapper/{0}'.format(self.name) self.info = None def addSecret(self, secretobj): if not isinstance(secretobj, LuksSecret): - raise ValueError('secretobj must be of type aif.disk.luks.LuksSecret ' - '(aif.disk.luks.LuksSecretPassphrase or ' - 'aif.disk.luks.LuksSecretFile)') + _logger.error('secretobj must be of type ' + 'aif.disk.luks.LuksSecret ' + '(aif.disk.luks.LuksSecretPassphrase or ' + 'aif.disk.luks.LuksSecretFile).') + raise ValueError('Invalid secretobj type') self.secrets.append(secretobj) return(None) def createSecret(self, secrets_xml = None): + _logger.info('Compiling secrets.') if not secrets_xml: # Find all of them from self - for secret in self.xml.findall('secrets'): + _logger.debug('No secrets_xml specified; fetching from configuration block.') + for secret_xml in self.xml.findall('secrets'): + _logger.debug('secret_xml: {0}'.format(etree.tostring(secret_xml, with_tail = False).decode('utf-8'))) secretobj = None secrettypes = set() - for s in secret.iterchildren(): + for s in secret_xml.iterchildren(): + _logger.debug('secret_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8'))) secrettypes.add(s.tag) if all((('passphrase' in secrettypes), ('keyFile' in secrettypes))): # This is safe, because a valid config only has at most one of both types. - kf = secret.find('keyFile') + kf = secret_xml.find('keyFile') secretobj = LuksSecretFile(kf.text, # path - passphrase = secret.find('passphrase').text, + passphrase = secret_xml.find('passphrase').text, bytesize = kf.attrib.get('size', 4096)) # TECHNICALLY should be a no-op. elif 'passphrase' in secrettypes: - secretobj = LuksSecretPassphrase(secret.find('passphrase').text) + secretobj = LuksSecretPassphrase(secret_xml.find('passphrase').text) elif 'keyFile' in secrettypes: - kf = secret.find('keyFile') + kf = secret_xml.find('keyFile') secretobj = LuksSecretFile(kf.text, passphrase = None, bytesize = kf.attrib.get('size', 4096)) self.secrets.append(secretobj) else: + _logger.debug('A secrets_xml was specified.') secretobj = None secrettypes = set() for s in secrets_xml.iterchildren(): + _logger.debug('secrets_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8'))) secrettypes.add(s.tag) if all((('passphrase' in secrettypes), ('keyFile' in secrettypes))): @@ -120,37 +133,63 @@ class LUKS(object): passphrase = None, bytesize = kf.attrib.get('size', 4096)) self.secrets.append(secretobj) + _logger.debug('Secrets compiled.') return(None) def create(self): if self.created: return(None) + _logger.info('Creating LUKS volume on {0}'.format(self.source)) if not self.secrets: - raise RuntimeError('Cannot create a LUKS volume with no secrets added') + _logger.error('Cannot create a LUKS volume with no secrets added.') + raise RuntimeError('Cannot create a LUKS volume with no secrets') for idx, secret in enumerate(self.secrets): if idx == 0: # TODO: add support for custom parameters for below? - # TODO: logging - cmd = ['cryptsetup', - '--batch-mode', - 'luksFormat', - '--type', 'luks2', - '--key-file', '-', - self.source] - subprocess.run(cmd, input = secret.passphrase) + cmd_str = ['cryptsetup', + '--batch-mode', + 'luksFormat', + '--type', 'luks2', + '--key-file', '-', + self.source] + cmd = subprocess.run(cmd_str, + input = secret.passphrase, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + if cmd.returncode != 0: + _logger.warning('Command returned non-zero status') + _logger.debug('Exit status: {0}'.format(str(cmd.returncode))) + for a in ('stdout', 'stderr'): + x = getattr(cmd, a) + if x: + _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip())) + raise RuntimeError('Failed to encrypt successfully') else: + # TODO: does the key-file need to be the same path in the installed system? tmpfile = tempfile.mkstemp() with open(tmpfile[1], 'wb') as fh: fh.write(secret.passphrase) - cmd = ['cryptsetup', - '--batch-mode', - 'luksAdd', - '--type', 'luks2', - '--key-file', '-', - self.source, - tmpfile[1]] - subprocess.run(cmd, input = self.secrets[0].passphrase) + cmd_str = ['cryptsetup', + '--batch-mode', + 'luksAdd', + '--type', 'luks2', + '--key-file', '-', + self.source, + tmpfile[1]] + cmd = subprocess.run(cmd_str, + input = self.secrets[0].passphrase, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + os.remove(tmpfile[1]) + if cmd.returncode != 0: + _logger.warning('Command returned non-zero status') + _logger.debug('Exit status: {0}'.format(str(cmd.returncode))) + for a in ('stdout', 'stderr'): + x = getattr(cmd, a) + if x: + _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip())) + raise RuntimeError('Failed to encrypt successfully') self.created = True return(None) @@ -159,12 +198,19 @@ class LUKS(object): raise RuntimeError('Cannot lock a LUKS volume before it is created') if self.locked: return(None) - # TODO: logging - cmd = ['cryptsetup', - '--batch-mode', - 'luksClose', - self.name] - subprocess.run(cmd) + cmd_str = ['cryptsetup', + '--batch-mode', + 'luksClose', + self.name] + cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + if cmd.returncode != 0: + _logger.warning('Command returned non-zero status') + _logger.debug('Exit status: {0}'.format(str(cmd.returncode))) + for a in ('stdout', 'stderr'): + x = getattr(cmd, a) + if x: + _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip())) + raise RuntimeError('Failed to lock successfully') self.locked = True return(None) @@ -173,13 +219,21 @@ class LUKS(object): raise RuntimeError('Cannot unlock a LUKS volume before it is created') if not self.locked: return(None) - cmd = ['cryptsetup', - '--batch-mode', - 'luksOpen', - '--key-file', '-', - self.source, - self.name] - subprocess.run(cmd, input = self.secrets[0].passphrase) + cmd_str = ['cryptsetup', + '--batch-mode', + 'luksOpen', + '--key-file', '-', + self.source, + self.name] + cmd = subprocess.run(cmd_str, input = self.secrets[0].passphrase) + if cmd.returncode != 0: + _logger.warning('Command returned non-zero status') + _logger.debug('Exit status: {0}'.format(str(cmd.returncode))) + for a in ('stdout', 'stderr'): + x = getattr(cmd, a) + if x: + _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip())) + raise RuntimeError('Failed to unlock successfully') self.locked = False return(None) @@ -187,13 +241,23 @@ class LUKS(object): if self.locked: raise RuntimeError('Must be unlocked to gather info') info = {} - cmd = ['cryptsetup', - '--batch-mode', - 'luksDump', - self.source] - _info = subprocess.run(cmd, stdout = subprocess.PIPE).stdout.decode('utf-8') + cmd_str = ['cryptsetup', + '--batch-mode', + 'luksDump', + self.source] + cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + if cmd.returncode != 0: + _logger.warning('Command returned non-zero status') + _logger.debug('Exit status: {0}'.format(str(cmd.returncode))) + for a in ('stdout', 'stderr'): + x = getattr(cmd, a) + if x: + _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip())) + raise RuntimeError('Failed to fetch info successfully') + _info = cmd.stdout.decode('utf-8') k = None # I wish there was a better way to do this but I sure as heck am not writing a regex to do it. + # https://gitlab.com/cryptsetup/cryptsetup/issues/511 # https://pypi.org/project/parse/ _tpl = ('LUKS header information\nVersion: {header_ver}\nEpoch: {epoch_ver}\n' 'Metadata area: {metadata_pos} [bytes]\nKeyslots area: {keyslots_pos} [bytes]\n' @@ -225,17 +289,20 @@ class LUKS(object): if v.lower() == '(no flags)': v = [] else: - # Space-separated or comma-separated? TODO. + # Is this pace-separated or comma-separated? TODO. v = [i.strip() for i in v.split() if i.strip() != ''] elif k == 'uuid': v = uuid.UUID(hex = v) self.info = info + _logger.debug('Rendered updated info: {0}'.format(self.inf)) return(None) - def writeConf(self, conf = '/etc/crypttab'): + def writeConf(self, chroot_base, init_hook = True): + _logger.info('Generating crypttab.') if not self.secrets: - raise RuntimeError('secrets must be added before the configuration can be written') - conf = os.path.realpath(conf) + _logger.error('Secrets must be added before the configuration can be written.') + raise RuntimeError('Missing secrets') + conf = os.path.join(chroot_base, 'etc', 'crypttab') with open(conf, 'r') as fh: conflines = fh.read().splitlines() # Get UUID @@ -257,4 +324,8 @@ class LUKS(object): if luksinfo not in conflines: with open(conf, 'a') as fh: fh.write('{0}\n'.format(luksinfo)) + if init_hook: + _logger.debug('Symlinked initramfs crypttab.') + os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs')) + _logger.debug('Generated crypttab line: {0}'.format(luksinfo)) return(None)