EVOLUTION-MANAGER
Edit File: daemon.py
# Code shared by various Koji daemons # Copyright (c) 2010-2014 Red Hat, Inc. # # Koji is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; # version 2.1 of the License. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this software; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # Authors: # Mike McLean <mikem@redhat.com> # Mike Bonnet <mikeb@redhat.com> from __future__ import absolute_import, division import errno import hashlib import logging import os import signal import subprocess import sys import time import traceback from fnmatch import fnmatch import six from six.moves import range, urllib import koji import koji.tasks import koji.xmlrpcplus from koji.tasks import safe_rmtree from koji.util import ( adler32_constructor, base64encode, dslice, parseStatus, to_list ) def incremental_upload(session, fname, fd, path, retries=5, logger=None): if not fd: return if logger is None: logger = logging.getLogger('koji.daemon') if session.opts.get('use_fast_upload'): fast_incremental_upload(session, fname, fd, path, retries, logger) return while True: offset = fd.tell() contents = fd.read(65536) size = len(contents) if size == 0: break data = base64encode(contents) digest = hashlib.md5(contents).hexdigest() del contents tries = 0 while True: if session.uploadFile(path, fname, size, digest, offset, data): break if tries <= retries: tries += 1 time.sleep(10) continue else: logger.error("Error uploading file %s to %s at offset %d" % (fname, path, offset)) break def fast_incremental_upload(session, fname, fd, path, retries, logger): """Like incremental_upload, but use the fast upload mechanism""" while True: offset = fd.tell() contents = fd.read(65536) if not contents: break hexdigest = adler32_constructor(contents).hexdigest() tries = 0 while True: result = session.rawUpload(contents, offset, path, fname, overwrite=True) if result['hexdigest'] == hexdigest: break if tries <= retries: tries += 1 time.sleep(10) continue else: logger.error("Error uploading file %s to %s at offset %d" % (fname, path, offset)) break def log_output(session, path, args, outfile, uploadpath, cwd=None, logerror=0, append=0, chroot=None, env=None): """Run command with output redirected. If chroot is not None, chroot to the directory specified before running the command.""" pid = os.fork() fd = None if not pid: session._forget() try: if chroot: os.chroot(chroot) if cwd: os.chdir(cwd) flags = os.O_CREAT | os.O_WRONLY if append: flags |= os.O_APPEND fd = os.open(outfile, flags, 0o666) os.dup2(fd, 1) if logerror: os.dup2(fd, 2) # echo the command we're running into the logfile msg = '$ %s\n' % ' '.join(args) if six.PY3: msg = msg.encode('utf-8') os.write(fd, msg) environ = os.environ.copy() if env: environ.update(env) os.execvpe(path, args, environ) except BaseException: msg = ''.join(traceback.format_exception(*sys.exc_info())) if fd: try: if six.PY3: os.write(fd, msg.encode('utf-8')) else: os.write(fd, msg) os.close(fd) except Exception: pass print(msg) os._exit(1) else: if chroot: outfile = os.path.normpath(chroot + outfile) outfd = None remotename = os.path.basename(outfile) while True: status = os.waitpid(pid, os.WNOHANG) time.sleep(1) if not outfd: try: outfd = open(outfile, 'rb') except IOError: # will happen if the forked process has not created the logfile yet continue except Exception: print('Error reading log file: %s' % outfile) print(''.join(traceback.format_exception(*sys.exc_info()))) incremental_upload(session, remotename, outfd, uploadpath) if status[0] != 0: if outfd: outfd.close() return status[1] # BEGIN kojikamid dup # class SCM(object): "SCM abstraction class" types = {'CVS': ('cvs://',), 'CVS+SSH': ('cvs+ssh://',), 'GIT': ('git://', 'git+http://', 'git+https://', 'git+rsync://'), 'GIT+SSH': ('git+ssh://',), 'SVN': ('svn://', 'svn+http://', 'svn+https://'), 'SVN+SSH': ('svn+ssh://',)} @classmethod def is_scm_url(cls, url, strict=False): """ Return True if the url appears to be a valid, accessible source location, False otherwise """ schemes = [s for t in cls.types for s in cls.types[t]] for scheme in schemes: if url.startswith(scheme): return True # otherwise not valid if strict: raise koji.GenericError('Invalid scheme in scm url. Valid schemes ' 'are: %s' % ' '.join(sorted(schemes))) else: return False def __init__(self, url): """ Initialize the SCM object using the specified url. The expected url format is: scheme://[user@]host/path/to/repo?path/to/module#revision_or_tag_identifier The initialized SCM object will have the following attributes: - url (the unmodified url) - scheme - user (may be null) - host - repository - module - revision - use_common (defaults to True, may be set by assert_allowed()) - source_cmd (defaults to ['make', 'sources'], may be set by assert_allowed()) - scmtype The exact format of each attribute is SCM-specific, but the structure of the url must conform to the template above, or an error will be raised. """ self.logger = logging.getLogger('koji.build.SCM') if not SCM.is_scm_url(url, strict=True): raise koji.GenericError('Invalid SCM URL: %s' % url) self.url = url scheme, user, host, path, query, fragment = self._parse_url() self.scheme = scheme self.user = user self.host = host self.repository = path self.module = query self.revision = fragment self.use_common = True self.source_cmd = ['make', 'sources'] for scmtype, schemes in SCM.types.items(): if self.scheme in schemes: self.scmtype = scmtype break else: # pragma: no cover # should never happen raise koji.GenericError('Invalid SCM URL: %s' % url) def get_info(self, keys=None): if keys is None: keys = ["url", "scheme", "user", "host", "repository", "module", "revision", "scmtype"] return dslice(vars(self), keys) def _parse_url(self): """ Parse the SCM url into usable components. Return the following tuple: (scheme, user, host, path, query, fragment) user may be None, everything else will have a value """ # get the url's scheme scheme = self.url.split('://')[0] + '://' # replace the scheme with http:// so that the urlparse works in all cases dummyurl = self.url.replace(scheme, 'http://', 1) dummyscheme, netloc, path, params, query, fragment = urllib.parse.urlparse(dummyurl) user = None userhost = netloc.split('@') if len(userhost) == 2: user = userhost[0] if not user: # Don't return an empty string user = None elif ':' in user: raise koji.GenericError('username:password format not supported: %s' % user) netloc = userhost[1] elif len(userhost) > 2: raise koji.GenericError('Invalid username@hostname specified: %s' % netloc) if not netloc: raise koji.GenericError( 'Unable to parse SCM URL: %s . Could not find the netloc element.' % self.url) # check for empty path before we apply normpath if not path: raise koji.GenericError( 'Unable to parse SCM URL: %s . Could not find the path element.' % self.url) path = os.path.normpath(path) # path and query should not end with / path = path.rstrip('/') query = query.rstrip('/') # normpath might not strip // at start of path if path.startswith('//'): path = '/' + path.strip('/') # path should start with / if not path.startswith('/'): # pragma: no cover # any such url should have already been caught by is_scm_url raise koji.GenericError('Invalid SCM URL. Path should begin with /: %s) ') # check for validity: params should be empty, query may be empty, everything else should be # populated if params: raise koji.GenericError( 'Unable to parse SCM URL: %s . Params element %s should be empty.' % (self.url, params)) if not scheme: # pragma: no cover # should not happen because of is_scm_url check earlier raise koji.GenericError( 'Unable to parse SCM URL: %s . Could not find the scheme element.' % self.url) if not fragment: raise koji.GenericError( 'Unable to parse SCM URL: %s . Could not find the fragment element.' % self.url) # return parsed values return (scheme, user, netloc, path, query, fragment) def assert_allowed(self, allowed): """ Check this scm against allowed list and apply options allowed is a space-separated list of entries in one of the following forms: host:repository[:use_common[:source_cmd]] !host:repository Incorrectly-formatted entries will be skipped with a warning. The first form allows a host:repository pattern and optionally sets a few options for it. The second form explicitly blocks a host:repository pattern Both host and repository are treated as glob patterns If there is a matching entry, then the optional fields, if given, will be applied to the instance. If there is no matching entry, or if the host:repository is blocked then BuildError is raised. The use_common option defaults to on. If it is set to no, off, false or 0, it will be disabled. If the option is on, then kojid will attempt to checkout a common/ directory from the repository. The source_command is a shell command to be run before building the srpm. It defaults to "make sources". This can be overridden by the matching allowed entry. The command must be encoded with commas instead of spaces (e.g. "make,sources"). """ is_allowed = False for allowed_scm in allowed.split(): scm_tuple = allowed_scm.split(':') if len(scm_tuple) < 2: self.logger.warning('Ignoring incorrectly formatted SCM host:repository: %s' % allowed_scm) continue host_pat = scm_tuple[0] repo_pat = scm_tuple[1] invert = False if host_pat.startswith('!'): invert = True host_pat = host_pat[1:] if fnmatch(self.host, host_pat) and fnmatch(self.repository, repo_pat): # match if invert: break is_allowed = True # check if we specify a value for use_common if len(scm_tuple) >= 3: if scm_tuple[2].lower() in ('no', 'off', 'false', '0'): self.use_common = False # check if we specify a custom source_cmd if len(scm_tuple) >= 4: if scm_tuple[3]: self.source_cmd = scm_tuple[3].split(',') else: # there was nothing after the trailing :, # so they don't want to run a source_cmd at all self.source_cmd = None break if not is_allowed: raise koji.BuildError( '%s:%s is not in the list of allowed SCMs' % (self.host, self.repository)) def checkout(self, scmdir, session=None, uploadpath=None, logfile=None): """ Checkout the module from SCM. Accepts the following parameters: - scmdir: the working directory - session: a ClientSession object - uploadpath: the path on the server the logfile should be uploaded to - logfile: the file used for logging command output session, uploadpath, and logfile are not used when run within kojikamid, but are otherwise required. Returns the directory that the module was checked-out into (a subdirectory of scmdir) """ # TODO: sanity check arguments sourcedir = '%s/%s' % (scmdir, self.module) update_checkout_cmd = None update_checkout_dir = None env = None def _run(cmd, chdir=None, fatal=False, log=True, _count=[0]): if globals().get('KOJIKAMID'): # we've been inserted into kojikamid, use its run() return run(cmd, chdir=chdir, fatal=fatal, log=log) # noqa: F821 else: append = (_count[0] > 0) _count[0] += 1 if log_output(session, cmd[0], cmd, logfile, uploadpath, cwd=chdir, logerror=1, append=append, env=env): raise koji.BuildError('Error running %s command "%s", see %s for details' % (self.scmtype, ' '.join(cmd), os.path.basename(logfile))) if self.scmtype == 'CVS': pserver = ':pserver:%s@%s:%s' % ((self.user or 'anonymous'), self.host, self.repository) module_checkout_cmd = ['cvs', '-d', pserver, 'checkout', '-r', self.revision, self.module] common_checkout_cmd = ['cvs', '-d', pserver, 'checkout', 'common'] elif self.scmtype == 'CVS+SSH': if not self.user: raise koji.BuildError( 'No user specified for repository access scheme: %s' % self.scheme) cvsserver = ':ext:%s@%s:%s' % (self.user, self.host, self.repository) module_checkout_cmd = ['cvs', '-d', cvsserver, 'checkout', '-r', self.revision, self.module] common_checkout_cmd = ['cvs', '-d', cvsserver, 'checkout', 'common'] env = {'CVS_RSH': 'ssh'} elif self.scmtype == 'GIT': scheme = self.scheme if '+' in scheme: scheme = scheme.split('+')[1] gitrepo = '%s%s%s' % (scheme, self.host, self.repository) commonrepo = os.path.dirname(gitrepo) + '/common' checkout_path = os.path.basename(self.repository) if self.repository.endswith('/.git'): # If we're referring to the .git subdirectory of the main module, # assume we need to do the same for the common module checkout_path = os.path.basename(self.repository[:-5]) commonrepo = os.path.dirname(gitrepo[:-5]) + '/common/.git' elif self.repository.endswith('.git'): # If we're referring to a bare repository for the main module, # assume we need to do the same for the common module checkout_path = os.path.basename(self.repository[:-4]) commonrepo = os.path.dirname(gitrepo[:-4]) + '/common.git' sourcedir = '%s/%s' % (scmdir, checkout_path) module_checkout_cmd = ['git', 'clone', '-n', gitrepo, sourcedir] common_checkout_cmd = ['git', 'clone', commonrepo, 'common'] update_checkout_cmd = ['git', 'reset', '--hard', self.revision] update_checkout_dir = sourcedir # self.module may be empty, in which case the specfile should be in the top-level # directory if self.module: # Treat the module as a directory inside the git repository sourcedir = '%s/%s' % (sourcedir, self.module) elif self.scmtype == 'GIT+SSH': if not self.user: raise koji.BuildError( 'No user specified for repository access scheme: %s' % self.scheme) gitrepo = 'git+ssh://%s@%s%s' % (self.user, self.host, self.repository) commonrepo = os.path.dirname(gitrepo) + '/common' checkout_path = os.path.basename(self.repository) if self.repository.endswith('/.git'): # If we're referring to the .git subdirectory of the main module, # assume we need to do the same for the common module checkout_path = os.path.basename(self.repository[:-5]) commonrepo = os.path.dirname(gitrepo[:-5]) + '/common/.git' elif self.repository.endswith('.git'): # If we're referring to a bare repository for the main module, # assume we need to do the same for the common module checkout_path = os.path.basename(self.repository[:-4]) commonrepo = os.path.dirname(gitrepo[:-4]) + '/common.git' sourcedir = '%s/%s' % (scmdir, checkout_path) module_checkout_cmd = ['git', 'clone', '-n', gitrepo, sourcedir] common_checkout_cmd = ['git', 'clone', commonrepo, 'common'] update_checkout_cmd = ['git', 'reset', '--hard', self.revision] update_checkout_dir = sourcedir # self.module may be empty, in which case the specfile should be in the top-level # directory if self.module: # Treat the module as a directory inside the git repository sourcedir = '%s/%s' % (sourcedir, self.module) elif self.scmtype == 'SVN': scheme = self.scheme if '+' in scheme: scheme = scheme.split('+')[1] svnserver = '%s%s%s' % (scheme, self.host, self.repository) module_checkout_cmd = ['svn', 'checkout', '-r', self.revision, '%s/%s' % (svnserver, self.module), self.module] common_checkout_cmd = ['svn', 'checkout', '%s/common' % svnserver] elif self.scmtype == 'SVN+SSH': if not self.user: raise koji.BuildError( 'No user specified for repository access scheme: %s' % self.scheme) svnserver = 'svn+ssh://%s@%s%s' % (self.user, self.host, self.repository) module_checkout_cmd = ['svn', 'checkout', '-r', self.revision, '%s/%s' % (svnserver, self.module), self.module] common_checkout_cmd = ['svn', 'checkout', '%s/common' % svnserver] else: raise koji.BuildError('Unknown SCM type: %s' % self.scmtype) # perform checkouts _run(module_checkout_cmd, chdir=scmdir, fatal=True) if update_checkout_cmd: # Currently only required for GIT checkouts # Run the command in the directory the source was checked out into if self.scmtype.startswith('GIT') and globals().get('KOJIKAMID'): _run(['git', 'config', 'core.autocrlf', 'true'], chdir=update_checkout_dir, fatal=True) _run(['git', 'config', 'core.safecrlf', 'true'], chdir=update_checkout_dir, fatal=True) _run(update_checkout_cmd, chdir=update_checkout_dir, fatal=True) if self.use_common and not globals().get('KOJIKAMID'): _run(common_checkout_cmd, chdir=scmdir, fatal=True) if not os.path.exists('%s/../common' % sourcedir): # find the relative distance from sourcedir/../common to scmdir/common destdir = os.path.split(sourcedir)[0] path_comps = destdir[len(scmdir) + 1:] rel_path = '../' * len(path_comps.split('/')) os.symlink(rel_path + 'common', '%s/../common' % sourcedir) self.sourcedir = sourcedir return sourcedir def get_source(self): r = { 'url': self.url, 'source': '', } if self.scmtype.startswith('GIT'): cmd = ['git', 'rev-parse', 'HEAD'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.sourcedir,) out, _ = proc.communicate() status = proc.wait() if status != 0: raise koji.GenericError('Error getting commit hash for git') fragment = out.strip() if six.PY3: fragment = fragment.decode() scheme = self.scheme[:-3] netloc = self.host path = self.repository query = self.module r['source'] = urllib.parse.urlunsplit([scheme, netloc, path, query, fragment]) else: # just use the same url r['source'] = self.url return r # END kojikamid dup # class TaskManager(object): def __init__(self, options, session): self.options = options self.session = session self.tasks = {} self.skipped_tasks = {} self.pids = {} self.subsessions = {} self.handlers = {} self.status = '' self.restart_pending = False self.ready = False self.hostdata = {} self.task_load = 0.0 self.host_id = self.session.host.getID() self.start_ts = self.session.getSessionInfo()['start_ts'] self.logger = logging.getLogger("koji.TaskManager") def findHandlers(self, vars): """Find and index task handlers""" for v in vars.values(): self.registerHandler(v) def registerHandler(self, entry): """register and index task handler""" if isinstance(entry, type(koji.tasks.BaseTaskHandler)) and \ issubclass(entry, koji.tasks.BaseTaskHandler): for method in entry.Methods: self.handlers[method] = entry def registerCallback(self, entry): """register and index callback plugins""" if callable(entry) and getattr(entry, 'callbacks', None): for cbtype in entry.callbacks: koji.plugin.register_callback(cbtype, entry) def registerEntries(self, vars): """Register task handlers and other plugins""" for v in vars.values(): self.registerHandler(v) self.registerCallback(v) def scanPlugin(self, plugin): """Find task handlers in a plugin""" self.registerEntries(vars(plugin)) def shutdown(self): """Attempt to shut down cleanly""" for task_id in self.pids: self.cleanupTask(task_id) self.session.host.freeTasks(to_list(self.tasks.keys())) self.session.host.updateHost(task_load=0.0, ready=False) def updateBuildroots(self, nolocal=False): """Handle buildroot cleanup/maintenance - examine current buildroots on system - compare with db - clean up as needed - /var/lib/mock - /etc/mock/koji If nolocal is True, do not try to scan local buildroots. """ # query buildroots in db that are not expired states = [koji.BR_STATES[x] for x in ('INIT', 'WAITING', 'BUILDING')] db_br = self.session.listBuildroots(hostID=self.host_id, state=tuple(states)) # index by id db_br = dict([(row['id'], row) for row in db_br]) st_expired = koji.BR_STATES['EXPIRED'] for id, br in db_br.items(): task_id = br['task_id'] if task_id is None: # not associated with a task # this makes no sense now, but may in the future self.logger.warning("Expiring taskless buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) self.session.host.setBuildRootState(id, st_expired) elif task_id not in self.tasks: # task not running - expire the buildroot # TODO - consider recycling hooks here (with strong sanity checks) self.logger.info("Expiring buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) self.logger.debug( "Buildroot task: %r, Current tasks: %r" % (task_id, to_list(self.tasks.keys()))) self.session.host.setBuildRootState(id, st_expired) continue if nolocal: return local_br = self._scanLocalBuildroots() # get info on local_only buildroots (most likely expired) local_only = [id for id in local_br if id not in db_br] if local_only: missed_br = self.session.listBuildroots(buildrootID=tuple(local_only)) # get all the task info in one call tasks = [] for br in missed_br: task_id = br['task_id'] if task_id: tasks.append(task_id) # index missed_br = dict([(row['id'], row) for row in missed_br]) tasks = dict([(row['id'], row) for row in self.session.getTaskInfo(tasks)]) # go from +- oldest for id in sorted(local_only): # Cleaning options # - wait til later # - "soft" clean (leaving empty root/ dir) # - full removal data = local_br[id] br = missed_br.get(id) if not br: self.logger.warning("%(name)s: not in db" % data) continue desc = "%(id)i/%(tag_name)s/%(arch)s" % br if not br['retire_ts']: self.logger.warning("%s: no retire timestamp" % desc) continue age = time.time() - br['retire_ts'] self.logger.debug("Expired/stray buildroot: %s" % desc) if br and br['task_id']: task = tasks.get(br['task_id']) if not task: self.logger.warning("%s: invalid task %s" % (desc, br['task_id'])) continue if task['state'] == koji.TASK_STATES['FAILED'] and \ age < self.options.failed_buildroot_lifetime: # XXX - this could be smarter # keep buildroots for failed tasks around for a little while if self.checkSpace(): # we can leave it in place, otherwise delete it self.logger.debug("Keeping failed buildroot: %s" % desc) continue topdir = data['dir'] rootdir = None if topdir: rootdir = "%s/root" % topdir try: st = os.lstat(rootdir) except OSError as e: if e.errno == errno.ENOENT: rootdir = None else: self.logger.warning("%s: %s" % (desc, e)) continue else: age = min(age, time.time() - st.st_mtime) # note: https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=192153) # If rpmlib is installing in this chroot, removing it entirely # can lead to a world of hurt. # We remove the rootdir contents but leave the rootdir unless it # is really old if age > 3600 * 24: # dir untouched for a day self.logger.info("Removing buildroot: %s" % desc) if topdir and safe_rmtree(topdir, unmount=True, strict=False) != 0: continue # also remove the config try: os.unlink(data['cfg']) except OSError as e: self.logger.warning("%s: can't remove config: %s" % (desc, e)) elif age > 120: if rootdir: try: flist = os.listdir(rootdir) except OSError as e: self.logger.warning("%s: can't list rootdir: %s" % (desc, e)) continue if flist: self.logger.info("%s: clearing rootdir" % desc) for fn in flist: safe_rmtree("%s/%s" % (rootdir, fn), unmount=True, strict=False) resultdir = "%s/result" % topdir if os.path.isdir(resultdir): self.logger.info("%s: clearing resultdir" % desc) safe_rmtree(resultdir, unmount=True, strict=False) else: self.logger.debug("Recent buildroot: %s: %i seconds" % (desc, age)) self.logger.debug("Local buildroots: %d" % len(local_br)) self.logger.debug("Active buildroots: %d" % len(db_br)) self.logger.debug("Expired/stray buildroots: %d" % len(local_only)) def _scanLocalBuildroots(self): # XXX configdir = '/etc/mock/koji' buildroots = {} for f in os.listdir(configdir): if not f.endswith('.cfg'): continue fn = "%s/%s" % (configdir, f) if not os.path.isfile(fn): continue fo = open(fn, 'r') id = None name = None for n in range(10): # data should be in first few lines line = fo.readline() if line.startswith('# Koji buildroot id:'): try: id = int(line.split(':')[1]) except (ValueError, IndexError): continue if line.startswith('# Koji buildroot name:'): try: name = line.split(':')[1].strip() except (ValueError, IndexError): continue if id is None or name is None: continue # see if there's a dir for the buildroot vardir = os.path.join(self.options.mockdir, name) buildroots[id] = {} buildroots[id]['name'] = name buildroots[id]['cfg'] = fn buildroots[id]['dir'] = None if os.path.isdir(vardir): buildroots[id]['dir'] = vardir return buildroots def updateTasks(self): """Read and process task statuses from server The processing we do is: 1) clean up after tasks that are not longer active: * kill off processes * retire buildroots * remove buildroots - with some possible exceptions 2) wake waiting tasks if appropriate """ tasks = {} stale = [] task_load = 0.0 if self.pids: self.logger.info("pids: %r" % self.pids) for task in self.session.host.getHostTasks(): self.logger.info("open task: %r" % task) # the tasks returned are those that are open and locked # by this host. id = task['id'] if id not in self.pids: # We don't have a process for this # Expected to happen after a restart, otherwise this is an error stale.append(id) continue tasks[id] = task if task.get('alert', False): # wake up the process self.logger.info("Waking up task: %r" % task) os.kill(self.pids[id], signal.SIGUSR2) if not task['waiting']: task_load += task['weight'] self.logger.debug("Task Load: %s" % task_load) self.task_load = task_load self.tasks = tasks self.logger.debug("Current tasks: %r" % self.tasks) if len(stale) > 0: # A stale task is one which is opened to us, but we know nothing # about). This will happen after a daemon restart, for example. self.logger.info("freeing stale tasks: %r" % stale) self.session.host.freeTasks(stale) for id, pid in list(self.pids.items()): if self._waitTask(id, pid): # the subprocess handles most everything, we just need to clear things out if self.cleanupTask(id, wait=False): del self.pids[id] if id in self.tasks: del self.tasks[id] for id, pid in list(self.pids.items()): if id not in tasks: # expected to happen when: # - we are in the narrow gap between the time the task # records its result and the time the process actually # exits. # - task is canceled # - task is forcibly reassigned/unassigned tinfo = self.session.getTaskInfo(id) if tinfo is None: raise koji.GenericError("Invalid task %r (pid %r)" % (id, pid)) elif tinfo['state'] == koji.TASK_STATES['CANCELED']: self.logger.info("Killing canceled task %r (pid %r)" % (id, pid)) if self.cleanupTask(id): del self.pids[id] elif tinfo['host_id'] != self.host_id: self.logger.info("Killing reassigned task %r (pid %r)" % (id, pid)) if self.cleanupTask(id): del self.pids[id] else: self.logger.info("Lingering task %r (pid %r)" % (id, pid)) def getNextTask(self): self.ready = self.readyForTask() self.session.host.updateHost(self.task_load, self.ready) if not self.ready: self.logger.info("Not ready for task") return False hosts, tasks = self.session.host.getLoadData() self.logger.debug("Load Data:") self.logger.debug(" hosts: %r" % hosts) self.logger.debug(" tasks: %r" % tasks) # now we organize this data into channel-arch bins bin_hosts = {} # hosts indexed by bin bins = {} # bins for this host our_avail = None for host in hosts: host['bins'] = [] if host['id'] == self.host_id: # note: task_load reported by server might differ from what we # sent due to precision variation our_avail = host['capacity'] - host['task_load'] for chan in host['channels']: for arch in host['arches'].split() + ['noarch']: bin = "%s:%s" % (chan, arch) bin_hosts.setdefault(bin, []).append(host) if host['id'] == self.host_id: bins[bin] = 1 self.logger.debug("bins: %r" % bins) if our_avail is None: self.logger.info("Server did not report this host. Are we disabled?") return False elif not bins: self.logger.info("No bins for this host. Missing channel/arch config?") # Note: we may still take an assigned task below # sort available capacities for each of our bins avail = {} for bin in bins: avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]] avail[bin].sort() avail[bin].reverse() self.cleanDelayTimes() for task in tasks: # note: tasks are in priority order self.logger.debug("task: %r" % task) if task['method'] not in self.handlers: self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task) continue if task['id'] in self.tasks: # we were running this task, but it apparently has been # freed or reassigned. We can't do anything with it until # updateTasks notices this and cleans up. self.logger.debug("Task %(id)s freed or reassigned", task) continue if task['state'] == koji.TASK_STATES['ASSIGNED']: self.logger.debug("task is assigned") if self.host_id == task['host_id']: # assigned to us, we can take it regardless if self.takeTask(task): return True elif task['state'] == koji.TASK_STATES['FREE']: bin = "%(channel_id)s:%(arch)s" % task self.logger.debug("task is free, bin=%r" % bin) if bin not in bins: continue # see where our available capacity is compared to other hosts for this bin # (note: the hosts in this bin are exactly those that could # accept this task) bin_avail = avail.get(bin, [0]) if self.checkAvailDelay(task, bin_avail, our_avail): # decline for now and give the upper half a chance continue # otherwise, we attempt to open the task if self.takeTask(task): return True else: # should not happen raise Exception("Invalid task state reported by server") return False def checkAvailDelay(self, task, bin_avail, our_avail): """Check to see if we should still delay taking a task Returns True if we are still in the delay period and should skip the task. Otherwise False (delay has expired). """ now = time.time() ts = self.skipped_tasks.get(task['id']) if not ts: ts = self.skipped_tasks[task['id']] = now # determine our normalized bin rank for pos, cap in enumerate(bin_avail): if our_avail >= cap: break if len(bin_avail) > 1: rank = float(pos) / (len(bin_avail) - 1) else: rank = 0.0 # so, 0.0 for highest available capacity, 1.0 for lowest delay = getattr(self.options, 'task_avail_delay', 180) delay *= rank # return True if we should delay if now - ts < delay: self.logger.debug("skipping task %i, age=%s rank=%s" % (task['id'], int(now - ts), rank)) return True # otherwise del self.skipped_tasks[task['id']] return False def cleanDelayTimes(self): """Remove old entries from skipped_tasks""" now = time.time() delay = getattr(self.options, 'task_avail_delay', 180) cutoff = now - delay * 10 # After 10x the delay, we've had plenty of opportunity to take the # task, so either it has already been taken or we can't take it. for task_id in list(self.skipped_tasks): ts = self.skipped_tasks[task_id] if ts < cutoff: del self.skipped_tasks[task_id] def _waitTask(self, task_id, pid=None): """Wait (nohang) on the task, return true if finished""" if pid is None: pid = self.pids.get(task_id) if not pid: raise koji.GenericError("No pid for task %i" % task_id) prefix = "Task %i (pid %i)" % (task_id, pid) try: (childpid, status) = os.waitpid(pid, os.WNOHANG) except OSError as e: # check errno if e.errno != errno.ECHILD: # should not happen raise # otherwise assume the process is gone self.logger.info("%s: %s" % (prefix, e)) return True if childpid != 0: self.logger.info(parseStatus(status, prefix)) return True return False def _doKill(self, task_id, pid, cmd, sig, timeout, pause): """ Kill the process with the given process ID. Return True if the process is successfully killed in the given timeout, False otherwise. """ self.logger.info('Checking "%s" (pid %i, taskID %i)...' % (cmd, pid, task_id)) execname = cmd.split()[0] signaled = False t = 0.0 while True: status = self._getStat(pid) if status and status[1] == cmd and status[2] != 'Z': self.logger.info('%s (pid %i, taskID %i) is running' % (execname, pid, task_id)) else: if signaled: self.logger.info( '%s (pid %i, taskID %i) was killed by signal %i' % (execname, pid, task_id, sig)) else: self.logger.info('%s (pid %i, taskID %i) exited' % (execname, pid, task_id)) return True if t >= timeout: self.logger.warning('Failed to kill %s (pid %i, taskID %i) with signal %i' % (execname, pid, task_id, sig)) return False try: os.kill(pid, sig) except OSError as e: # process probably went away, we'll find out on the next iteration self.logger.info('Error sending signal %i to %s (pid %i, taskID %i): %s' % (sig, execname, pid, task_id, e)) else: signaled = True self.logger.info('Sent signal %i to %s (pid %i, taskID %i)' % (sig, execname, pid, task_id)) time.sleep(pause) t += pause def _getStat(self, pid): """ Get the stat info for the given pid. Return a list of all the fields in /proc/<pid>/stat. The second entry will contain the full command-line instead of just the command name. If the process does not exist, return None. """ try: proc_path = '/proc/%i/stat' % pid if not os.path.isfile(proc_path): return None proc_file = open(proc_path) procstats = [not field.isdigit() and field or int(field) for field in proc_file.read().split()] proc_file.close() cmd_path = '/proc/%i/cmdline' % pid if not os.path.isfile(cmd_path): return None cmd_file = open(cmd_path) procstats[1] = cmd_file.read().replace('\0', ' ').strip() cmd_file.close() if not procstats[1]: return None return procstats except IOError: # process may have already gone away return None def _childPIDs(self, pid): """Recursively get the children of the process with the given ID. Return a list containing the process IDs of the children in breadth-first order, without duplicates.""" statsByPPID = {} pidcmd = None for procdir in os.listdir('/proc'): if not procdir.isdigit(): continue procid = int(procdir) procstats = self._getStat(procid) if not procstats: continue statsByPPID.setdefault(procstats[3], []).append(procstats) if procid == pid: pidcmd = procstats[1] pids = [] if pidcmd: # only append the pid if it still exists pids.append((pid, pidcmd)) parents = [pid] while parents: for ppid in parents[:]: for procstats in statsByPPID.get(ppid, []): # get the /proc entries with ppid as their parent, and append their pid to the # list, then recheck for their children pid is the 0th field, ppid is the 3rd # field pids.append((procstats[0], procstats[1])) parents.append(procstats[0]) parents.remove(ppid) return pids def _killChildren(self, task_id, children, sig=signal.SIGTERM, timeout=2.0, pause=1.0): """ Kill child processes of the given task, as specified in the children list, by sending sig. Retry every pause seconds, within timeout. Remove successfully killed processes from the "children" list. """ for childpid, cmd in children[::-1]: # iterate in reverse order so processes whose children are killed might have # a chance to cleanup before they're killed if self._doKill(task_id, childpid, cmd, sig, timeout, pause): children.remove((childpid, cmd)) def cleanupTask(self, task_id, wait=True): """Clean up after task - kill children - expire session Return True if all children were successfully killed, False otherwise. """ pid = self.pids.get(task_id) if not pid: raise koji.GenericError("No pid for task %i" % task_id) children = self._childPIDs(pid) if children: # send SIGINT once to let mock mock try to clean up self._killChildren(task_id, children, sig=signal.SIGINT, pause=3.0) if children: self._killChildren(task_id, children) if children: self._killChildren(task_id, children, sig=signal.SIGKILL, timeout=3.0) # expire the task's subsession session_id = self.subsessions.get(task_id) if session_id: self.logger.info("Expiring subsession %i (task %i)" % (session_id, task_id)) try: self.session.logoutChild(session_id) del self.subsessions[task_id] except Exception: # not much we can do about it pass if wait: return self._waitTask(task_id, pid) else: # task has already been waited on, and we've cleaned # up as much as we can return True def checkSpace(self): """See if we have enough space to accept another job""" br_path = self.options.mockdir if not os.path.exists(br_path): self.logger.error("No such directory: %s" % br_path) raise IOError("No such directory: %s" % br_path) fs_stat = os.statvfs(br_path) available = fs_stat.f_bavail * fs_stat.f_bsize availableMB = available // 1024 // 1024 self.logger.debug("disk space available in '%s': %i MB", br_path, availableMB) if availableMB < self.options.minspace: self.status = "Insufficient disk space at %s: %i MB, %i MB required" % \ (br_path, availableMB, self.options.minspace) self.logger.warning(self.status) return False return True def readyForTask(self): """Determine if the system is ready to accept a new task. This function measures the system load and tries to determine if there is room to accept a new task.""" # key resources to track: # disk_space # df -P path # df -iP path ? # memory (meminfo/vmstat) # vmstat fields 3-6 (also 7-8 for swap) # https://www.redhat.com/advice/tips/meminfo.html # cpu cycles (vmstat?) # vmstat fields 13-16 (and others?) # others?: # io (iostat/vmstat) # network (netstat?) if self.restart_pending: if self.tasks: return False else: raise koji.tasks.ServerRestart self.hostdata = self.session.host.getHost() self.logger.debug('hostdata: %r' % self.hostdata) if not self.hostdata['enabled']: self.status = "Host is disabled" self.logger.info(self.status) return False if self.task_load > self.hostdata['capacity']: self.status = "Over capacity" self.logger.info( "Task load (%.2f) exceeds capacity (%.2f)" % (self.task_load, self.hostdata['capacity'])) return False if len(self.tasks) >= self.options.maxjobs: # This serves as a backup to the capacity check and prevents # a tremendous number of low weight jobs from piling up self.status = "Full queue" self.logger.info(self.status) return False if not self.checkSpace(): # checkSpace() does its own logging return False loadavgs = os.getloadavg() # this likely treats HT processors the same as real ones # but that's fine, it's a conservative test maxload = 4.0 * os.sysconf('SC_NPROCESSORS_ONLN') if loadavgs[0] > maxload: self.status = "Load average %.2f > %.2f" % (loadavgs[0], maxload) self.logger.info(self.status) return False # XXX - add more checks return True def takeTask(self, task): """Attempt to open the specified task Returns True if successful, False otherwise """ self.logger.info("Attempting to take task %s" % task['id']) method = task['method'] if method in self.handlers: handlerClass = self.handlers[method] else: raise koji.GenericError("No handler found for method '%s'" % method) task_info = self.session.getTaskInfo(task['id'], request=True) if task_info.get('request') is None: self.logger.warning("Task '%s' has no request" % task['id']) return False params = task_info['request'] handler = handlerClass(task_info['id'], method, params, self.session, self.options) if hasattr(handler, 'checkHost'): try: valid_host = handler.checkHost(self.hostdata) except (SystemExit, KeyboardInterrupt): raise except Exception: valid_host = False self.logger.warning('Error during host check') self.logger.warning(''.join(traceback.format_exception(*sys.exc_info()))) if not valid_host: self.logger.info( 'Skipping task %s (%s) due to host check', task['id'], task['method']) return False data = self.session.host.openTask(task['id']) if data is None: self.logger.warning("Could not open") return False task_id = data['id'] self.tasks[task_id] = data # set weight try: self.session.host.setTaskWeight(task_id, handler.weight()) except koji.ActionNotAllowed: info2 = self.session.getTaskInfo(task['id']) if info2['host_id'] != self.host_id: self.logger.warning("Task %i was reassigned", task_id) return False state = koji.TASK_STATES[info2['state']] if state != 'OPEN': self.logger.warning("Task %i changed is %s", task_id, state) return False # otherwise... raise if handler.Foreground: self.logger.info("running task in foreground") handler.setManager(self) self.runTask(handler) else: pid, session_id = self.forkTask(handler) self.pids[task_id] = pid self.subsessions[task_id] = session_id return True def forkTask(self, handler): # get the subsession before we fork newhub = self.session.subsession() session_id = newhub.sinfo['session-id'] pid = os.fork() if pid: newhub._forget() return pid, session_id # in no circumstance should we return after the fork # nor should any exceptions propagate past here try: self.session._forget() # set process group os.setpgrp() # use the subsession self.session = newhub handler.session = self.session # set a do-nothing handler for sigusr2 signal.signal(signal.SIGUSR2, lambda *args: None) self.runTask(handler) finally: # diediedie try: self.session.logout() finally: os._exit(0) def runTask(self, handler): try: response = (handler.run(),) # note that we wrap response in a singleton tuple response = koji.xmlrpcplus.dumps(response, methodresponse=1, allow_none=1) self.logger.info("RESPONSE: %r" % response) self.session.host.closeTask(handler.id, response) return except koji.xmlrpcplus.Fault as fault: response = koji.xmlrpcplus.dumps(fault) tb = ''.join(traceback.format_exception(*sys.exc_info())).replace(r"\n", "\n") self.logger.warning("FAULT:\n%s" % tb) except (SystemExit, koji.tasks.ServerExit, KeyboardInterrupt): # we do not trap these raise except koji.tasks.ServerRestart: # freeing this task will allow the pending restart to take effect self.session.host.freeTasks([handler.id]) return except Exception: tb = ''.join(traceback.format_exception(*sys.exc_info())) self.logger.warning("TRACEBACK: %s" % tb) # report exception back to server e_class, e = sys.exc_info()[:2] faultCode = getattr(e_class, 'faultCode', 1) if issubclass(e_class, koji.GenericError): # just pass it through tb = str(e) response = koji.xmlrpcplus.dumps(koji.xmlrpcplus.Fault(faultCode, tb)) # if we get here, then we're handling an exception, so fail the task self.session.host.failTask(handler.id, response)