repomirror/repomirror/sync.py

339 lines
13 KiB
Python
Raw Normal View History

2020-06-14 03:46:29 -04:00
import datetime
import logging
import pwd
import grp
2020-06-14 03:46:29 -04:00
import os
import socket
2020-06-16 15:44:50 -04:00
import warnings
##
import psutil
2020-06-14 03:46:29 -04:00
##
from . import config
from . import constants
from . import fetcher
2020-06-16 15:44:50 -04:00
from . import logger
2020-06-14 03:46:29 -04:00
_logger = logging.getLogger()
def get_owner(owner_xml):
owner = {}
user = owner_xml.find('user')
2020-06-16 15:44:50 -04:00
if user is not None:
user = user.text
group = owner_xml.find('group')
2020-06-16 15:44:50 -04:00
if group is not None:
group = group.text
if user:
user_obj = pwd.getpwnam(user)
else:
user_obj = pwd.getpwuid(os.geteuid())
owner['uid'] = user_obj.pw_uid
if group:
group_obj = grp.getgrnam(group)
else:
group_obj = grp.getgrgid(pwd.getpwuid(os.geteuid()).pw_gid)
owner['gid'] = group_obj.gr_gid
_logger.debug('Resolved owner xml to {0}'.format(owner))
return(owner)
2020-06-14 03:46:29 -04:00
class Args(object):
def __init__(self, args_xml):
self.xml = args_xml
self.args = []
self._parse_xml()
def _parse_xml(self):
self.args = []
2020-06-14 03:46:29 -04:00
for arg_xml in self.xml.xpath('(short|long)'):
val = arg_xml.attrib.get('value')
if arg_xml.tag == 'short':
prefix = '-'
# elif arg_xml.tag == 'long':
else:
prefix = '--'
arg = '{0}{1}'.format(prefix, arg_xml.text)
if val:
arg += '={0}'.format(val)
self.args.append(arg)
_logger.debug('Generated args list: {0}'.format(self.args))
return(None)
2020-06-14 03:46:29 -04:00
class Mount(object):
def __init__(self, mpchk_xml):
2020-06-16 15:44:50 -04:00
self.path = os.path.abspath(os.path.expanduser(mpchk_xml.text))
2020-06-14 03:46:29 -04:00
self.is_mounted = None
self._check_mount()
def _check_mount(self):
_logger.debug('Getting mount status for {0}'.format(self.path))
2020-06-14 03:46:29 -04:00
with open('/proc/mounts', 'r') as fh:
raw = fh.read()
for line in raw.splitlines():
l = line.split()
mp = l[1]
if mp == self.path:
_logger.debug('{0} is mounted.'.format(self.path))
2020-06-14 03:46:29 -04:00
self.is_mounted = True
return(None)
self.is_mounted = False
_logger.debug('{0} is not mounted.'.format(self.path))
2020-06-14 03:46:29 -04:00
return(None)
class TimestampFile(object):
def __init__(self, ts_xml, owner_xml = None):
2020-06-14 03:46:29 -04:00
self.fmt = ts_xml.attrib.get('timeFormat', 'UNIX_EPOCH')
if self.fmt == 'UNIX_EPOCH':
self.fmt = '%s'
elif self.fmt == 'MICROSECOND_EPOCH':
self.fmt = '%s.%f'
_logger.debug('Set timestamp format string to {0}'.format(self.fmt))
self.owner_xml = owner_xml
self.owner = {}
2020-06-16 15:44:50 -04:00
if self.owner_xml is not None:
self.owner = get_owner(self.owner_xml)
_logger.debug('Owner set is {0}'.format(self.owner))
2020-06-14 03:46:29 -04:00
self.path = os.path.abspath(os.path.expanduser(ts_xml.text))
_logger.debug('Path resolved to {0}'.format(self.path))
def read(self, parentdir = None):
2020-06-16 15:44:50 -04:00
timestamp = None
if parentdir:
path = os.path.join(os.path.abspath(os.path.expanduser(parentdir)),
self.path.lstrip('/'))
else:
path = self.path
2020-06-16 15:44:50 -04:00
if os.path.isfile(path):
with open(path, 'r') as fh:
ts_raw = fh.read().strip()
if '%s' in self.fmt:
2020-06-17 01:10:09 -04:00
timestamp = datetime.datetime.fromtimestamp(float(ts_raw))
2020-06-16 15:44:50 -04:00
else:
timestamp = datetime.datetime.strptime(ts_raw, self.fmt)
_logger.debug('Read timestamp {0} from {1}'.format(str(timestamp), self.path))
return(timestamp)
def write(self):
dname = os.path.dirname(self.path)
if not os.path.isdir(dname):
os.makedirs(dname, mode = 0o0755)
if self.owner:
os.chown(dname, **self.owner)
_logger.debug('Created {0}'.format(dname))
with open(self.path, 'w') as fh:
fh.write(datetime.datetime.utcnow().strftime(self.fmt))
fh.write('\n')
os.chmod(self.path, mode = 0o0644)
if self.owner:
os.chown(self.path, **self.owner)
_logger.debug('Wrote timestamp to {0}'.format(self.path))
return(None)
2020-06-14 03:46:29 -04:00
class Upstream(object):
def __init__(self, upstream_xml, dest, rsync_args = None, owner = None, filechecks = None):
self.xml = upstream_xml
# These are required for all upstreams.
self.sync_type = self.xml.find('syncType').text.lower()
self.domain = self.xml.find('domain').text
self.path = self.xml.find('path').text
self.dest = os.path.abspath(os.path.expanduser(dest))
self.owner = owner
self.filechecks = filechecks
self.has_new = False
# These are optional.
2020-06-16 15:44:50 -04:00
port = self.xml.find('port')
if port is not None:
self.port = int(port.text)
else:
self.port = constants.PROTO_DEF_PORTS[self.sync_type]
self.available = None
if self.sync_type == 'rsync':
self.fetcher = fetcher.RSync(self.domain,
self.port,
self.path,
self.dest,
rsync_args = rsync_args,
filechecks = self.filechecks,
owner = self.owner)
else:
self.fetcher = fetcher.FTP(self.domain, self.port, self.path, self.dest, owner = self.owner)
self._check_conn()
def _check_conn(self):
sock = socket.socket()
sock.settimeout(7)
try:
sock.connect((self.domain, self.port))
sock.close()
self.available = True
except (socket.timeout, socket.error):
self.available = False
return(None)
def sync(self):
self.fetcher.fetch()
return(None)
class Distro(object):
def __init__(self, distro_xml):
self.xml = distro_xml
2020-06-16 15:44:50 -04:00
self.name = self.xml.attrib['name']
self.dest = os.path.abspath(os.path.expanduser(self.xml.find('dest').text))
self.mount = Mount(self.xml.find('mountCheck'))
self.filechecks = {'local': {'check': None,
'sync': None},
'remote': {'update': None,
'sync': None}}
self.timestamps = {}
self.rsync_args = None
self.owner = None
self.upstreams = []
2020-06-16 15:44:50 -04:00
self.lockfile = '/var/run/repomirror/{0}.lck'.format(self.name)
# These are optional.
self.owner_xml = self.xml.find('owner')
2020-06-16 15:44:50 -04:00
if self.owner_xml is not None:
self.owner = get_owner(self.owner_xml)
self.rsync_xml = self.xml.find('rsyncArgs')
2020-06-16 15:44:50 -04:00
if self.rsync_xml is not None:
self.rsync_args = Args(self.rsync_xml)
for i in ('Check', 'Sync'):
e = self.xml.find('lastLocal{0}'.format(i))
2020-06-16 15:44:50 -04:00
if e is not None:
self.filechecks['local'][i.lower()] = TimestampFile(e)
for i in ('Sync', 'Update'):
e = self.xml.find('lastRemote{0}'.format(i))
2020-06-16 15:44:50 -04:00
if e is not None:
self.filechecks['remote'][i.lower()] = TimestampFile(e)
for u in self.xml.findall('upstream'):
self.upstreams.append(Upstream(u,
self.dest,
rsync_args = self.rsync_args,
owner = self.owner,
filechecks = self.filechecks))
def check(self):
2020-06-16 15:44:50 -04:00
for k, v in self.filechecks['local'].items():
if v:
tstmp = v.read()
self.timestamps[k] = tstmp
_logger.debug('Updated timestamps: {0}'.format(self.timestamps))
2020-06-16 15:44:50 -04:00
local_checks = sorted([i for i in self.timestamps.values() if i])
for u in self.upstreams:
if not u.available:
continue
u.fetcher.check()
remote_checks = sorted([i for i in u.fetcher.timestamps.values() if i])
if not any((local_checks, remote_checks)) or not remote_checks:
u.has_new = True
else:
update = u.fetcher.timestamps.get('update')
sync = u.fetcher.timestamps.get('sync')
if update:
if local_checks and local_checks[-1] < update:
u.has_new = True
elif not local_checks:
u.has_new = True
if sync:
td = datetime.datetime.utcnow() - sync
if td.days > constants.DAYS_WARN:
_logger.warning(('Upstream {0} has not synced for {1}} or more days; this '
'repository may be out of date.').format(u.fetcher.url, constants.DAYS_WARN))
warnings.warn('Upstream may be out of date')
return(None)
def sync(self):
self.check()
2020-06-16 15:44:50 -04:00
my_pid = os.getpid()
if os.path.isfile(self.lockfile):
with open(self.lockfile, 'r') as fh:
pid = int(fh.read().strip())
if my_pid == pid: # This logically should not happen, but something might have gone stupid.
_logger.warning('Someone call the Ghostbusters because this machine is haunted.')
return(False)
else:
warnmsg = 'The sync process for {0} is locked with file {1} and PID {2}'.format(self.name,
self.lockfile,
pid)
try:
proc = psutil.Process(pid)
warnmsg += '.'
except (psutil.NoSuchProcess, FileNotFoundError, AttributeError):
proc = None
warnmsg += ' but that PID no longer exists.'
_logger.warning(warnmsg)
if proc:
_logger.warning('PID information: {0}'.format(vars(proc)))
warnings.warn(warnmsg)
return(False)
if not self.mount.is_mounted:
_logger.error(('The mountpoint {0} for distro {1} is not mounted; '
'refusing to sync').format(self.mount.path, self.name))
return(False)
os.makedirs(os.path.dirname(self.lockfile), mode = 0o0755, exist_ok = True)
with open(self.lockfile, 'w') as fh:
fh.write('{0}\n'.format(str(my_pid)))
for u in self.upstreams:
if not u.available:
continue
if u.has_new:
u.sync()
if self.filechecks['local']['sync']:
self.filechecks['local']['sync'].write()
break
if self.filechecks['local']['check']:
self.filechecks['local']['check'].write()
2020-06-16 15:44:50 -04:00
os.remove(self.lockfile)
return(True)
2020-06-14 03:46:29 -04:00
class Sync(object):
def __init__(self, cfg = None, dummy = False, distro = None, logdir = None, *args, **kwargs):
2020-06-16 15:44:50 -04:00
if logdir:
self.logdir = logdir
else:
self.logdir = os.path.dirname(logger.filehandler.baseFilename)
self._orig_log_old = logger.filehandler.baseFilename
self._orig_log = logger.preplog(os.path.join(self.logdir, '_main.log'))
logger.filehandler.close()
logger.filehandler.baseFilename = self._orig_log
try:
_args = dict(locals())
del(_args['self'])
_logger.debug('Sync class instantiated with args: {0}'.format(_args))
self.dummy = dummy
if distro:
self.distro = distro
else:
self.distro = []
2020-06-16 15:44:50 -04:00
self.cfg = config.Config(cfg)
except Exception as e:
_logger.error('FATAL ERROR. Stacktrace follows.', exc_info = True)
2020-06-16 15:44:50 -04:00
raise e
def sync(self):
2020-06-16 15:44:50 -04:00
if self.distro:
for d in self.distro:
e = self.cfg.xml.xpath('//distro[@name="{0}"]'.format(d))
if e is None:
_logger.error('Could not find specified distro {0}; skipping'.format(d))
continue
logger.filehandler.close()
logger.filehandler.baseFilename = os.path.join(self.logdir, '{0}.log'.format(e.attrib['name']))
distro = Distro(e[0])
distro.sync()
else:
for e in self.cfg.xml.findall('distro'):
logger.filehandler.close()
logger.filehandler.baseFilename = os.path.join(self.logdir, '{0}.log'.format(e.attrib['name']))
distro = Distro(e)
distro.sync()
logger.filehandler.close()
logger.filehandler.baseFilename = self._orig_log
return(None)