repomirror/repomirror/sync.py

447 lines
18 KiB
Python

import datetime
import logging
import pwd
import grp
import os
import re
import socket
import sys
import warnings
##
import psutil
##
from . import config
from . import constants
from . import fetcher
from . import logger
_logger = logging.getLogger()
if os.isatty(sys.stdin.fileno()):
_is_cron = False
else:
_is_cron = True
_duration_re = re.compile(('^(?P<mod>[-+])?P'
'((?P<years>[0-9]+(\.[0-9]+)?)Y)?'
'((?P<months>[0-9]+(\.[0-9]+)?)M)?'
'((?P<days>[0-9]+(\.[0-9]+)?)D)?'
'T?'
'((?P<hours>[0-9]+(\.[0-9]+)?)H)?'
'((?P<minutes>[0-9]+(\.[0-9]+)?)M)?'
'((?P<seconds>[0-9]+(\.[0-9]+)?)S)?'
'$'))
def get_owner(owner_xml):
owner = {}
user = owner_xml.find('user')
if user is not None:
user = user.text
group = owner_xml.find('group')
if group is not None:
group = group.text
if user:
user_obj = pwd.getpwnam(user)
else:
user_obj = pwd.getpwuid(os.geteuid())
owner['uid'] = user_obj.pw_uid
if group:
group_obj = grp.getgrnam(group)
else:
group_obj = grp.getgrgid(pwd.getpwuid(os.geteuid()).pw_gid)
owner['gid'] = group_obj.gr_gid
_logger.debug('Resolved owner xml to {0}'.format(owner))
return(owner)
def get_duration(duration_str):
r = _duration_re.search(duration_str)
times = {k: (float(v) if v else 0.0) for k, v in r.groupdict().items()}
mod = times.pop('mod')
if not mod:
mod = '+'
years = float(times.pop('years'))
months = float(times.pop('months'))
times['days'] = (times['days'] + (years * constants.YEAR) + (months * constants.MONTH))
delay = datetime.timedelta(**times)
return((mod, delay))
class Args(object):
def __init__(self, args_xml):
self.xml = args_xml
self.args = []
self._parse_xml()
def _parse_xml(self):
self.args = []
for arg_xml in self.xml.xpath('(short|long)'):
val = arg_xml.attrib.get('value')
if arg_xml.tag == 'short':
prefix = '-'
# elif arg_xml.tag == 'long':
else:
prefix = '--'
arg = '{0}{1}'.format(prefix, arg_xml.text)
if val:
arg += '={0}'.format(val)
self.args.append(arg)
_logger.debug('Generated args list: {0}'.format(self.args))
return(None)
class Mount(object):
def __init__(self, mpchk_xml):
self.path = os.path.abspath(os.path.expanduser(mpchk_xml.text))
self.is_mounted = None
self._check_mount()
def _check_mount(self):
_logger.debug('Getting mount status for {0}'.format(self.path))
with open('/proc/mounts', 'r') as fh:
raw = fh.read()
for line in raw.splitlines():
l = line.split()
mp = l[1]
if mp == self.path:
_logger.debug('{0} is mounted.'.format(self.path))
self.is_mounted = True
return(None)
self.is_mounted = False
_logger.debug('{0} is not mounted.'.format(self.path))
return(None)
class TimeOffset(object):
def __init__(self, duration_str):
self.mod, self.offset = get_duration(duration_str)
class TimestampFile(object):
def __init__(self, ts_xml, owner_xml = None):
self.xml = ts_xml
self.fmt = ts_xml.attrib.get('timeFormat', 'UNIX_EPOCH')
if self.fmt == 'UNIX_EPOCH':
self.fmt = '%s'
elif self.fmt == 'MICROSECOND_EPOCH':
self.fmt = '%s.%f'
_logger.debug('Set timestamp format string to {0}'.format(self.fmt))
self.mtime = (True if self.xml.attrib.get('mtime', 'false').lower().startswith(('t', '1')) else False)
_logger.debug('Using mtime: {0}'.format(self.mtime))
self.owner_xml = owner_xml
self.owner = {}
if self.owner_xml is not None:
self.owner = get_owner(self.owner_xml)
_logger.debug('Owner set is {0}'.format(self.owner))
self.path = os.path.abspath(os.path.expanduser(ts_xml.text))
_logger.debug('Path resolved to {0}'.format(self.path))
def read(self, parentdir = None):
timestamp = None
if parentdir:
path = os.path.join(os.path.abspath(os.path.expanduser(parentdir)),
self.path.lstrip('/'))
else:
path = self.path
if os.path.isfile(path):
if self.mtime:
timestamp = datetime.datetime.fromtimestamp(float(os.stat(path).st_mtime))
else:
with open(path, 'r') as fh:
ts_raw = fh.read().strip()
if '%s' in self.fmt:
timestamp = datetime.datetime.fromtimestamp(float(ts_raw))
else:
timestamp = datetime.datetime.strptime(ts_raw, self.fmt)
_logger.debug('Read timestamp {0} from {1}'.format(str(timestamp), self.path))
return(timestamp)
def write(self):
dname = os.path.dirname(self.path)
if not os.path.isdir(dname):
os.makedirs(dname, mode = 0o0755)
if self.owner:
os.chown(dname, **self.owner)
_logger.debug('Created {0}'.format(dname))
with open(self.path, 'w') as fh:
fh.write(datetime.datetime.utcnow().strftime(self.fmt))
fh.write('\n')
os.chmod(self.path, mode = 0o0644)
if self.owner:
os.chown(self.path, **self.owner)
if self.mtime:
now = float(datetime.datetime.utcnow().timestamp())
os.utime(self.path, (now, now))
_logger.debug('Wrote timestamp to {0}'.format(self.path))
return(None)
class Upstream(object):
def __init__(self, upstream_xml, dest, rsync_args = None, owner = None, filechecks = None, rsync_ignores = None):
self.xml = upstream_xml
# These are required for all upstreams.
self.sync_type = self.xml.find('syncType').text.lower()
self.domain = self.xml.find('domain').text
self.path = self.xml.find('path').text
self.dest = os.path.abspath(os.path.expanduser(dest))
self.delay = None
self.offset = None
self.owner = owner
self.filechecks = filechecks
self._get_delaychk()
self._get_offset()
self.has_new = False
# These are optional.
port = self.xml.find('port')
if port is not None:
self.port = int(port.text)
else:
self.port = constants.PROTO_DEF_PORTS[self.sync_type]
self.available = None
if self.sync_type == 'rsync':
_fetcher = fetcher.RSync
else:
_fetcher = fetcher.FTP
self.fetcher = _fetcher(self.domain,
self.port,
self.path,
self.dest,
rsync_args = rsync_args,
rsync_ignores = rsync_ignores,
filechecks = self.filechecks,
offset = self.offset,
owner = self.owner)
self._check_conn()
def _check_conn(self):
sock = socket.socket()
sock.settimeout(7)
try:
sock.connect((self.domain, self.port))
sock.close()
self.available = True
except (socket.timeout, socket.error):
self.available = False
return(None)
def _get_delaychk(self):
delay = self.xml.attrib.get('delayCheck')
if not delay:
return(None)
delay = TimeOffset(delay)
self.delay = delay.offset
return(None)
def _get_offset(self):
offset = self.xml.attrib.get('offset')
if not offset:
return(None)
self.offset = TimeOffset(offset)
return(None)
def sync(self):
self.fetcher.fetch()
return(None)
class Distro(object):
def __init__(self, distro_xml):
self.xml = distro_xml
self.name = self.xml.attrib['name']
self.dest = os.path.abspath(os.path.expanduser(self.xml.find('dest').text))
self.mount = Mount(self.xml.find('mountCheck'))
self.filechecks = {'local': {'check': None,
'sync': None},
'remote': {'update': None,
'sync': None}}
self.timestamps = {}
self.rsync_args = None
self.rsync_ignores = None
self.owner = None
self.upstreams = []
self.lockfile = '/var/run/repomirror/{0}.lck'.format(self.name)
# These are optional.
self.owner_xml = self.xml.find('owner')
if self.owner_xml is not None:
self.owner = get_owner(self.owner_xml)
self.rsync_xml = self.xml.find('rsyncArgs')
if self.rsync_xml is not None:
self.rsync_args = Args(self.rsync_xml)
for i in ('Check', 'Sync'):
e = self.xml.find('lastLocal{0}'.format(i))
if e is not None:
self.filechecks['local'][i.lower()] = TimestampFile(e)
for i in ('Sync', 'Update'):
e = self.xml.find('lastRemote{0}'.format(i))
if e is not None:
self.filechecks['remote'][i.lower()] = TimestampFile(e)
self.rsync_ignores = []
rsyncig_xml = self.xml.find('rsyncIgnore')
if rsyncig_xml is not None:
self.rsync_ignores = [int(i.strip()) for i in rsyncig_xml.attrib['returns'].split()]
for u in self.xml.findall('upstream'):
self.upstreams.append(Upstream(u,
self.dest,
rsync_args = self.rsync_args,
owner = self.owner,
filechecks = self.filechecks,
rsync_ignores = self.rsync_ignores))
def check(self):
for k, v in self.filechecks['local'].items():
if v:
tstmp = v.read()
self.timestamps[k] = tstmp
_logger.debug('Updated local timestamps: {0}'.format(self.timestamps))
local_checks = sorted([i for i in self.timestamps.values() if i])
if local_checks:
_logger.info('Local timestamps: {0}'.format(', '.join([str(t) for t in local_checks])))
for u in self.upstreams:
if not u.available:
continue
u.fetcher.check()
remote_checks = sorted([i for i in u.fetcher.timestamps.values() if i])
if remote_checks:
_logger.info('Remote timestamps for {0}: {1}'.format(u.domain, ', '.join([str(t)
for t in remote_checks])))
if not any((local_checks, remote_checks)) or not remote_checks:
_logger.info('There are no reliable timestamp comparisons; syncing.')
u.has_new = True
else:
update = u.fetcher.timestamps.get('update')
sync = u.fetcher.timestamps.get('sync')
if update:
if self.timestamps.get('update'):
if self.timestamps['update'] < update:
_logger.info('Local update timestamp is older than the remote update; syncing.')
_logger.debug('Local update: {0}, remote update: {1}'.format(self.timestamps['update'],
update))
u.has_new = True
elif local_checks and (local_checks[-1] < update):
_logger.info('Newest local timestamp is older than the remote update; syncing.')
_logger.debug('Newest local: {0}, remote update: {1}'.format(local_checks[-1], update))
u.has_new = True
elif not local_checks:
_logger.info('No local timestamps; syncing.')
u.has_new = True
else:
_logger.info('Local checks are newer than upstream; not syncing.')
_logger.debug('Newest local: {0}, remote update: {1}'.format(local_checks[-1], update))
else:
_logger.info('No remote update timestamp; syncing.')
u.has_new = True
if sync and u.delay:
td = datetime.datetime.utcnow() - sync
if td > u.delay:
_logger.warning(('Upstream {0} has not synced for {1} or longer; this '
'repository may be out of date.').format(u.fetcher.url, u.delay))
warnings.warn('Upstream may be out of date')
return(None)
def sync(self):
self.check()
my_pid = os.getpid()
if os.path.isfile(self.lockfile):
with open(self.lockfile, 'r') as fh:
pid = int(fh.read().strip())
if my_pid == pid: # This logically should not happen, but something might have gone stupid.
_logger.warning('Someone call the Ghostbusters because this machine is haunted.')
return(False)
else:
warnmsg = 'The sync process for {0} is locked with file {1} and PID {2}'.format(self.name,
self.lockfile,
pid)
try:
proc = psutil.Process(pid)
warnmsg += '.'
except (psutil.NoSuchProcess, FileNotFoundError, AttributeError):
proc = None
warnmsg += ' but that PID no longer exists.'
_logger.warning(warnmsg)
if proc:
_logger.warning('PID information: {0}'.format(vars(proc)))
# This is *really* annoying if you're running from cron and get emailed output.
# So we suppress it if in cron.
if not _is_cron:
warnings.warn(warnmsg)
if proc:
proc_info = {k.lstrip('_'): v for k, v in vars(proc).items() if k not in ('_lock', '_proc')}
import pprint
print('Process information:')
pprint.pprint(proc_info)
return(False)
if not self.mount.is_mounted:
_logger.error(('The mountpoint {0} for distro {1} is not mounted; '
'refusing to sync').format(self.mount.path, self.name))
return(False)
os.makedirs(os.path.dirname(self.lockfile), mode = 0o0755, exist_ok = True)
with open(self.lockfile, 'w') as fh:
fh.write('{0}\n'.format(str(my_pid)))
for u in self.upstreams:
if not u.available:
_logger.debug('Upstream {0} is not available; skipping.'.format(u.domain))
continue
if u.has_new:
_logger.info('Initiating syncing upstream {0}.'.format(u.domain))
u.sync()
_logger.debug('Sync for upstream {0} complete.'.format(u.domain))
if self.filechecks['local']['sync']:
self.filechecks['local']['sync'].write()
break
else:
_logger.debug('Upstream {0} is not new; not syncing.'.format(u.domain))
if self.filechecks['local']['check']:
self.filechecks['local']['check'].write()
os.remove(self.lockfile)
return(True)
class Sync(object):
def __init__(self, cfg = None, dummy = False, distro = None, logdir = None, *args, **kwargs):
if logdir:
self.logdir = logdir
else:
self.logdir = os.path.dirname(logger.filehandler.baseFilename)
self._orig_log_old = logger.filehandler.baseFilename
self._orig_log = logger.preplog(os.path.join(self.logdir, '_main.log'))
logger.filehandler.close()
logger.filehandler.baseFilename = self._orig_log
try:
_args = dict(locals())
del(_args['self'])
_logger.debug('Sync class instantiated with args: {0}'.format(_args))
self.dummy = dummy
if distro:
self.distro = distro
else:
self.distro = []
self.cfg = config.Config(cfg)
except Exception as e:
_logger.error('FATAL ERROR. Stacktrace follows.', exc_info = True)
raise e
def sync(self):
if self.distro:
for d in self.distro:
e = self.cfg.xml.xpath('//distro[@name="{0}"]'.format(d))
if e is None:
_logger.error('Could not find specified distro {0}; skipping'.format(d))
continue
e = e[0]
logger.filehandler.close()
logger.filehandler.baseFilename = os.path.join(self.logdir, '{0}.log'.format(e.attrib['name']))
distro = Distro(e)
distro.sync()
else:
for e in self.cfg.xml.findall('distro'):
logger.filehandler.close()
logger.filehandler.baseFilename = os.path.join(self.logdir, '{0}.log'.format(e.attrib['name']))
distro = Distro(e)
distro.sync()
logger.filehandler.close()
logger.filehandler.baseFilename = self._orig_log
return(None)