import configparser import io import logging import os ## import aif.utils from . import _common _logger = logging.getLogger(__name__) class Connection(_common.BaseConnection): def __init__(self, iface_xml): super().__init__(iface_xml) # TODO: disabling default route is not supported in-band. # https://bugs.archlinux.org/task/64651 # TODO: disabling autoroutes is not supported in-band. # https://bugs.archlinux.org/task/64651 # TODO: netctl profiles only support a single gateway. # is there a way to manually add alternative gateways? if not self.dhcp_client: self.dhcp_client = 'dhcpcd' self.provider_type = 'netctl' self.packages = {'netctl', 'openresolv'} self.services = {('/usr/lib/systemd/system/netctl@.service'): ('etc/systemd/system' '/multi-user.target.wants' '/netctl@{0}.service').format(self.id)} # Only used if we need to override default dhcp/dhcp6 behaviour. I don't *think* we can customize SLAAC? self.chroot_dir = os.path.join('etc', 'netctl', 'custom', self.dhcp_client) self.chroot_cfg = os.path.join(self.chroot_dir, self.id) self.desc = None def _initCfg(self): _logger.info('Building config.') if self.device == 'auto': self.device = _common.getDefIface(self.connection_type) self.desc = ('A {0} profile for {1} (generated by AIF-NG)').format(self.connection_type, self.device) self._cfg = configparser.ConfigParser(allow_no_value = True, interpolation = None) self._cfg.optionxform = str # configparser *requires* sections. netctl doesn't use them. We strip it when we write. self._cfg['BASE'] = {'Description': self.desc, 'Interface': self.device, 'Connection': self.connection_type} # Addresses if self.auto['addresses']['ipv4']: self.packages.add(self.dhcp_client) self._cfg['BASE']['IP'] = 'dhcp' self._cfg['BASE']['DHCPClient'] = self.dhcp_client else: if self.addrs['ipv4']: self._cfg['BASE']['IP'] = 'static' else: self._cfg['BASE']['IP'] = 'no' if self.domain: self._cfg['BASE']['DNSSearch'] = self.domain if self.auto['addresses']['ipv6']: if self.auto['addresses']['ipv6'] == 'slaac': self._cfg['BASE']['IP6'] = 'stateless' elif self.auto['addresses']['ipv6'] == 'dhcp6': self._cfg['BASE']['IP6'] = 'dhcp' self._cfg['BASE']['DHCP6Client'] = self.dhcp_client self.packages.add(self.dhcp_client) else: if not self.addrs['ipv6']: self._cfg['BASE']['IP6'] = 'no' else: self._cfg['BASE']['IP6'] = 'static' for addrtype in ('ipv4', 'ipv6'): keysuffix = ('6' if addrtype == 'ipv6' else '') addrkey = 'Address{0}'.format(keysuffix) gwkey = 'Gateway{0}'.format(keysuffix) str_addrs = [] if self.addrs[addrtype] and not self.auto['addresses'][addrtype]: for ip, cidr, gw in self.addrs[addrtype]: if not self.is_defroute: self._cfg['BASE'][gwkey] = str(gw) str_addrs.append("'{0}/{1}'".format(str(ip), str(cidr.prefixlen))) self._cfg['BASE'][addrkey] = '({0})'.format(' '.join(str_addrs)) elif self.addrs[addrtype]: if 'IPCustom' not in self._cfg['BASE']: # TODO: do this more cleanly somehow? Might conflict with other changes earlier/later. # Could I shlex it? # Weird hack because netctl doesn't natively support assigning add'l addrs to # a dhcp/dhcp6/slaac iface. self._cfg['BASE']['IPCustom'] = [] for ip, cidr, gw in self.addrs[addrtype]: self._cfg['BASE']['IPCustom'].append("'ip address add {0}/{1} dev {2}'".format(str(ip), str(cidr.prefixlen), self.device)) # Resolvers may also require a change to /etc/resolvconf.conf? for addrtype in ('ipv4', 'ipv6'): if self.resolvers: resolverkey = 'DNS' str_resolvers = [] for r in self.resolvers: str_resolvers.append("'{0}'".format(str(r))) self._cfg['BASE'][resolverkey] = '({0})'.format(' '.join(str_resolvers)) # Routes for addrtype in ('ipv4', 'ipv6'): if self.routes[addrtype]: keysuffix = ('6' if addrtype == 'ipv6' else '') routekey = 'Routes{0}'.format(keysuffix) str_routes = [] for dest, net, gw in self.routes[addrtype]: str_routes.append("'{0}/{1} via {2}'".format(str(dest), str(net.prefixlen), str(gw))) self._cfg['BASE'][routekey] = '({0})'.format(' '.join(str_routes)) # Weird hack because netctl doesn't natively support assigning add'l addrs to a dhcp/dhcp6/slaac iface. if 'IPCustom' in self._cfg['BASE'].keys() and isinstance(self._cfg['BASE']['IPCustom'], list): self._cfg['BASE']['IPCustom'] = '({0})'.format(' '.join(self._cfg['BASE']['IPCustom'])) _logger.info('Config built successfully.') # TODO: does this render correctly? _logger.debug('Config: {0}'.format(dict(self._cfg['BASE']))) return(None) def writeConf(self, chroot_base): systemd_base = os.path.join(chroot_base, 'etc', 'systemd', 'system') systemd_file = os.path.join(systemd_base, 'netctl@{0}.service.d'.format(self.id), 'profile.conf') netctl_file = os.path.join(chroot_base, 'etc', 'netctl', self.id) for f in (systemd_file, netctl_file): dpath = os.path.dirname(f) os.makedirs(dpath, exist_ok = True) os.chmod(dpath, 0o0755) os.chown(dpath, 0, 0) for root, dirs, files in os.walk(dpath): for d in dirs: fulld = os.path.join(root, d) os.chmod(fulld, 0o0755) os.chown(fulld, 0, 0) systemd_cfg = configparser.ConfigParser(allow_no_value = True, interpolation = None) systemd_cfg.optionxform = str systemd_cfg['Unit'] = {'Description': self.desc, 'BindsTo': 'sys-subsystem-net-devices-{0}.device'.format(self.device), 'After': 'sys-subsystem-net-devices-{0}.device'.format(self.device)} with open(systemd_file, 'w') as fh: systemd_cfg.write(fh, space_around_delimiters = False) _logger.info('Wrote systemd unit: {0}'.format(systemd_file)) # This is where it gets... weird. # Gross hacky workarounds because netctl, while great for simple setups, sucks for complex/advanced ones. no_auto = not all((self.auto['resolvers']['ipv4'], self.auto['resolvers']['ipv6'], self.auto['routes']['ipv4'], self.auto['routes']['ipv6'])) no_dhcp = not any((self.auto['addresses']['ipv4'], self.auto['addresses']['ipv6'])) if (no_auto and not no_dhcp) or (not self.is_defroute and not no_dhcp): if self.dhcp_client == 'dhcpcd': if not all((self.auto['resolvers']['ipv4'], self.auto['routes']['ipv4'], self.auto['addresses']['ipv4'])): self._cfg['BASE']['DhcpcdOptions'] = "'--config {0}'".format(os.path.join('/', self.chroot_cfg)) if not all((self.auto['resolvers']['ipv6'], self.auto['routes']['ipv6'], self.auto['addresses']['ipv6'])): self._cfg['BASE']['DhcpcdOptions6'] = "'--config {0}'".format(os.path.join('/', self.chroot_cfg)) elif self.dhcp_client == 'dhclient': if not all((self.auto['resolvers']['ipv4'], self.auto['routes']['ipv4'], self.auto['addresses']['ipv4'])): self._cfg['BASE']['DhcpcdOptions'] = "'-cf {0}'".format(os.path.join('/', self.chroot_cfg)) if not all((self.auto['resolvers']['ipv6'], self.auto['routes']['ipv6'], self.auto['addresses']['ipv6'])): self._cfg['BASE']['DhcpcdOptions6'] = "'-cf {0}'".format(os.path.join('/', self.chroot_cfg)) custom_dir = os.path.join(chroot_base, self.chroot_dir) custom_cfg = os.path.join(chroot_base, self.chroot_cfg) os.makedirs(custom_dir, exist_ok = True) for root, dirs, files in os.walk(custom_dir): os.chown(root, 0, 0) os.chmod(root, 0o0755) for d in dirs: dpath = os.path.join(root, d) os.chown(dpath, 0, 0) os.chmod(dpath, 0o0755) for f in files: fpath = os.path.join(root, f) os.chown(fpath, 0, 0) os.chmod(fpath, 0o0644) # Modify DHCP options. WHAT a mess. # The default requires are VERY sparse, and fine to remain unmangled for what we do. opts = {} for x in ('requests', 'requires'): opts[x] = {} for t in ('ipv4', 'ipv6'): opts[x][t] = list(self.dhcp_defaults[self.dhcp_client][x][t]) opt_map = { 'dhclient': { 'resolvers': { 'ipv4': ('domain-name-servers', ), 'ipv6': ('dhcp6.domain-name-servers', )}, 'routes': { 'ipv4': ('rfc3442-classless-static-routes', 'static-routes'), 'ipv6': tuple()}, # ??? # There is no way, as far as I can tell, to tell dhclient to NOT request an address. 'addresses': { 'ipv4': tuple(), 'ipv6': tuple()}}, 'dhcpcd': { 'resolvers': { 'ipv4': ('domain_name_servers', ), 'ipv6': ('dhcp6_domain_name_servers', )}, 'routes': { 'ipv4': ('classless_static_routes', 'static_routes'), 'ipv6': tuple()}, # ??? # I don't think dhcpcd lets us refuse an address. 'addresses': { 'ipv4': tuple(), 'ipv6': tuple()}}} # This ONLY works for DHCPv6 on the IPv6 side. Not SLAAC. Netctl doesn't use a dhcp client for # SLAAC, just iproute2. :| # x = routers, addresses, resolvers # t = ipv4/ipv6 dicts # i = ipv4/ipv6 key # v = boolean of auto # o = each option for given auto type and IP type for x, t in self.auto.items(): for i, v in t.items(): if not v: for o in opt_map[self.dhcp_client][x][i]: for n in ('requests', 'requires'): if o in opts[n][i]: opts[n][i].remove(o) # We don't want the default route if we're not the default route iface. if not self.is_defroute: # IPv6 uses RA for the default route... We'll probably need to do that via an ExecUpPost? # TODO. for i in ('requests', 'requires'): if 'routers' in opts[i]['ipv4']: opts[i]['ipv4'].remove('routers') if self.dhcp_client == 'dhclient': conf = ['lease {', ' interface "{0}";'.format(self.device), '}'] for i in ('request', 'require'): k = '{0}s'.format(i) optlist = [] for t in ('ipv4', 'ipv6'): optlist.extend(opts[k][t]) if optlist: conf.insert(-1, ' {0} {1};'.format(k, ', '.join(optlist))) elif self.dhcp_client == 'dhcpcd': conf = [] conf.extend(list(self.dhcp_defaults['dhcpcd']['default_opts'])) for i in ('requests', 'requires'): if i == 'requests': k = 'option' else: k = 'require' optlist = [] optlist.extend(opts[i]['ipv4']) optlist.extend(opts[i]['ipv6']) # TODO: does require support comma-separated list like option does? conf.append('{0} {1};'.format(k, ','.join(optlist))) with open(custom_cfg, 'w') as fh: fh.write('\n'.join(conf)) fh.write('\n') os.chmod(custom_cfg, 0o0644) os.chown(custom_cfg, 0, 0) _logger.info('Wrote: {0}'.format(custom_cfg)) # And we have to strip out the section from the ini. cfgbuf = io.StringIO() self._cfg.write(cfgbuf, space_around_delimiters = False) cfgbuf.seek(0, 0) with open(netctl_file, 'w') as fh: for line in cfgbuf.readlines(): if line.startswith('[BASE]') or line.strip() == '': continue fh.write(line) os.chmod(netctl_file, 0o0600) os.chown(netctl_file, 0, 0) _logger.info('Wrote: {0}'.format(netctl_file)) return(None) class Ethernet(Connection): def __init__(self, iface_xml): super().__init__(iface_xml) self.connection_type = 'ethernet' self._initCfg() class Wireless(Connection): def __init__(self, iface_xml): super().__init__(iface_xml) self.connection_type = 'wireless' self.packages.add('wpa_supplicant') self._initCfg() self._initConnCfg() def _initConnCfg(self): self._cfg['BASE']['ESSID'] = "'{0}'".format(self.xml.attrib['essid']) hidden = aif.utils.xmlBool(self.xml.attrib.get('hidden', 'false')) if hidden: self._cfg['BASE']['Hidden'] = 'yes' try: bssid = self.xml.attrib.get('bssid').strip() except AttributeError: bssid = None if bssid: bssid = _common.canonizeEUI(bssid) self._cfg['BASE']['AP'] = bssid crypto = self.xml.find('encryption') if crypto: crypto = _common.convertWifiCrypto(crypto, self.xml.attrib['essid']) # if crypto['type'] in ('wpa', 'wpa2', 'wpa3'): if crypto['type'] in ('wpa', 'wpa2'): # TODO: WPA2 enterprise self._cfg['BASE']['Security'] = 'wpa' # if crypto['type'] in ('wep', 'wpa', 'wpa2', 'wpa3'): if crypto['type'] in ('wpa', 'wpa2'): self._cfg['BASE']['Key'] = crypto['auth']['psk'] return(None)