diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 6da53b038c..b9ae6bb00b 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -62,7 +62,6 @@ VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\"" VARS Job{count} My.CRAB_DataBlock="\\"{block}\\"" VARS Job{count} My.CRAB_Destination="\\"{destination}\\"" -ABORT-DAG-ON Job{count} 3 """ @@ -346,7 +345,7 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - + jobSubmit['My.CRAB_ResubmitCounter'] = "0" # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist @@ -885,7 +884,7 @@ def createSubdag(self, splitterResult, **kwargs): ## In the future this parameter may be set by the user in the CRAB configuration ## file and we would take it from the Task DB. - self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2) + self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 9) runtime = self.task['tm_split_args'].get('minutes_per_job', -1) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 0a4e815e0f..f398ac0de9 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,6 +96,11 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) + dagAds = schedd.query(rootConst, ['CRAB_ResubmitCounter']) + current = 0 + if dagAds and isinstance(dagAds[0].get('CRAB_ResubmitCounter'), int): + current = dagAds[0]['CRAB_ResubmitCounter'] + schedd.edit(rootConst, "CRAB_ResubmitCounter", str(current + 1)) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 3bcb0b0d5c..4e4969d151 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -87,6 +87,7 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): 'My.CMS_Type', 'My.CMS_WMTool', 'My.CMS_TaskType', + 'My.CRAB_ResubmitCounter', ] for adName in adsToPort: diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 837b78f6d9..86711d8e2d 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -268,6 +268,10 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) + self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit #if 'MaxWallTimeMins_RAW' in self.task_ad: # if self.task_ad['MaxWallTimeMins_RAW'] != 1315: @@ -291,15 +295,33 @@ def alter_submit(self, crab_retry): priority = 20 #the maximum for splitting jobs else: # means we resubmit with same params as previous try ## SB most likely much (all) of this string/int conversions can be simplified - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime') maxmemory = self.resubmit_info[inkey].get('maxmemory') numcores = self.resubmit_info[inkey].get('numcores') priority = self.resubmit_info[inkey].get('priority') + + #ExitCode Dependent change in resubmission parameters for retries + + if self.resubmit_info: + retry_data = self.resubmit_info.get(inkey, {}) + if retry_data.get("increase_memory") and maxmemory: + factor = retry_data.get("memory_factor", 1.2) + new_memory = int(int(maxmemory) * factor) + if hasattr(self, "MAX_MEMORY"): + new_memory = min(new_memory, self.MAX_MEMORY) + self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") + maxmemory = new_memory + + if retry_data.get("increase_runtime") and maxjobruntime: + factor = retry_data.get("runtime_factor", 1.2) + new_runtime = int(int(maxjobruntime) * factor) + self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") + maxjobruntime = new_runtime + ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. + #get resubmission counter + resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0)) outkey = str(crab_retry) if outkey not in self.resubmit_info: self.resubmit_info[outkey] = {} @@ -309,7 +331,7 @@ def alter_submit(self, crab_retry): self.resubmit_info[outkey]['priority'] = priority self.resubmit_info[outkey]['use_resubmit_info'] = use_resubmit_info self.resubmit_info[outkey]['CRAB_ResubmitList_in_taskad'] = CRAB_ResubmitList_in_taskad - + self.resubmit_info[outkey]["resubmit_counter"] = resubmit_counter ## Add the resubmission parameters to the Job..submit content. ## if self.stage == "probe": @@ -373,6 +395,9 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = [] siteBlackSet = set() siteWhiteSet = set() + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) if not use_resubmit_info: if 'CRAB_SiteBlacklist' in self.task_ad: if self.task_ad['CRAB_SiteBlacklist']: # skip ad='' @@ -383,9 +408,6 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = self.task_ad['CRAB_SiteWhitelist'] siteWhiteSet = set(siteWhiteList) else: - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) siteBlackSet = set(self.resubmit_info[inkey].get('site_blacklist', [])) siteWhiteSet = set(self.resubmit_info[inkey].get('site_whitelist', [])) ## Save the current site black- and whitelists in self.resubmit_info for the @@ -414,6 +436,18 @@ def redoSites(self, crab_retry, use_resubmit_info): self.logger.error("Can not submit since DESIRED_Sites list is empty") self.prejob_exit_code = 1 sys.exit(self.prejob_exit_code) + + # ExitCode Dependent discard of previous_site + retry_data = self.resubmit_info.get(inkey, {}) + previous_site = retry_data.get("previous_site") + if retry_data.get("change_site") and previous_site: + self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}") + if previous_site in availableSet: + self.logger.info(f"Removing previous site {previous_site} from candidate sites") + availableSet.discard(previous_site) + else: + self.logger.info(f"No need to discard last site. inkey was {inkey}") + ## Make sure that attributest which will be used in MatchMaking are SORTED lists available = list(availableSet) available.sort() @@ -456,19 +490,38 @@ def needsDefer(self): slow release of jobs in a task. The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit time of the task plus the defer time is greater than the current time. + Additionally retry policy delay """ deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0)) + + currentTime = time.time() + + # Check retry delay from resubmit_info + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + key = str(self.dag_retry) + if key in retry_info: + retry_delay_until = retry_info[key].get("retry_delay_until") + if retry_delay_until and currentTime < retry_delay_until: + wait = int(retry_delay_until - currentTime) + self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.") + return True + except Exception: + self.logger.exception("Error checking retry delay in resubmit_info") + if deferTime: self.logger.info('Release timeout specified in extraJDL:') totalDefer = deferTime * int(self.job_id) submitTime = int(self.task_ad["CRAB_TaskSubmitTime"]) - currentTime = time.time() if currentTime < (submitTime + totalDefer): msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)" msg += f" not elapsed yet, deferring for {totalDefer} seconds" self.logger.info(msg) return True - self.logger.info(' Continuing normally since current time is greater than requested starttime of the job') + self.logger.info('Continuing normally since current time is greater than requested starttime of the job') return False def execute(self, *args): diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index b4d11a5c0e..af0287e44f 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -5,7 +5,9 @@ import shutil import subprocess import socket +import time from collections import namedtuple +from ast import literal_eval from ServerUtilities import executeCommand, getLock from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY @@ -15,6 +17,40 @@ JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2) +# ---------------------------------------------------------------------- +# Exit-code dependent retry policy +# ---------------------------------------------------------------------- + +EXIT_RETRY_POLICY = { + 1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, + 50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, + 10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, + 50: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, + 10040: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, + 60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 60307: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "increase_memory": True, "memory_factor": 1.3, "increase_runtime": True, "runtime_factor": 1.3}, + 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."}, + 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."}, + 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed)."}, + 8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception."}, + 65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW)."}, + "default": {"type": "neutral", "max_retries": 2, "delay": 900, "msg": "Taking default exit code retry policy route."} +} + # strings in fatal root exception text which indicate code problem, not corrupted file # a small "knowledge data base" NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [ @@ -98,6 +134,81 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def store_retry_actions(self, policy): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + except Exception: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + delay = policy.get("delay", 900) + retry_info[key]["retry_delay_until"] = time.time() + delay + + retry_info[key]["increase_memory"] = policy.get("increase_memory", False) + retry_info[key]["increase_runtime"] = policy.get("increase_runtime", False) + retry_info[key]["change_site"] = policy.get("change_site", False) + retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) + retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) + retry_info[key]["previous_site"] = getattr(self, "site", None) + + with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: + fd.write(str(retry_info)) + os.rename(retry_info_file + ".tmp", retry_info_file) + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def calculate_effective_max_retries(self, policy): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + except Exception: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + entry = retry_info.get(key, {}) + resubmit_counter = entry.get("resubmit_counter", 0) + base_max = policy["max_retries"] + effective_max_retries = (base_max + 1)*(resubmit_counter + 1) - 1 + self.logger.info(f"Resubmit counter = {resubmit_counter}, effective max retries = {effective_max_retries}") + + return effective_max_retries + + + def apply_retry_policy(self, exitCode): + """ + Enforce exit-code dependent retry limits and delay. + Raises FatalError if retry limit exceeded. + """ + policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) + + if policy["type"] == "recoverable": + effective_max_retries = self.calculate_effective_max_retries(policy) + if self.crab_retry >= effective_max_retries: + raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") + self.logger.info(f"Applying retry policy for exit code {exitCode}") + self.store_retry_actions(policy) + raise RecoverableError(policy["msg"]) + + if policy["type"] == "neutral": + return + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def get_job_ad_from_file(self): """ Need a doc string here @@ -307,6 +418,8 @@ def check_exit_code(self): return 1 try: exitCode = int(self.report['exitCode']) + # Apply retry policy + self.apply_retry_policy(exitCode) except ValueError: msg = "Unable to extract job's wrapper exit code from job report." self.logger.warning(msg) @@ -317,92 +430,58 @@ def check_exit_code(self): msg = "Job and stageout wrappers finished successfully (exit code %d)." % (exitCode) self.logger.info(msg) return 0 + # else: + # raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) msg = "Job or stageout wrapper finished with exit code %d." % (exitCode) - msg += " Trying to determine the meaning of the exit code and if it is a recoverable or fatal error." self.logger.info(msg) - # Wrapper script sometimes returns the posix return code (8 bits). - if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: - try: # the following is still a bit experimental, make sure it never crashes the PJ - corruptedInputFile = self.check_corrupted_file(exitCode) - except Exception as e: # pylint: disable=broad-except - msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" - self.logger.error(msg) - corruptedInputFile = False - if corruptedInputFile: - exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" - self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job - raise RecoverableError("Job failed to open local and fallback files.") - - if exitCode == 1: - raise RecoverableError("Job failed to bootstrap CMSSW; likely a worker node issue.") - - if exitCode == 50513 or exitCode == 81: - raise RecoverableError("Job did not find functioning CMSSW on worker node.") - - # This is a difficult one -- right now CMSRunAnalysis.py will turn things like - # segfaults into an invalid FJR. Will revisit this decision later. - if exitCode == 50115 or exitCode == 195: - raise RecoverableError("Job did not produce a FJR; will retry.") - - if exitCode == 134: - recoverable_signal = False - try: - fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - with open(fname, encoding='utf-8') as fd: - for line in fd: - if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): - recoverable_signal = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing abort signal." - msg += "\nDetails follow:" - self.logger.exception(msg) - if recoverable_signal: - raise RecoverableError("SIGILL; may indicate a worker node issue.") - - if exitCode == 8001 or exitCode == 65: - cvmfs_issue = False - try: - fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") - with open(fname, encoding='utf-8') as fd: - for line in fd: - if cvmfs_issue_re.match(line): - cvmfs_issue = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing output for CVMFS issues." - msg += "\nDetails follow:" - self.logger.exception(msg) - if cvmfs_issue: - raise RecoverableError("CVMFS issue detected.") - - # Another difficult case -- so far, SIGKILL has mostly been observed at T2_CH_CERN, and it has nothing to do - # with an issue of the job itself. Typically, this isn't the user code's fault - # it was often a site or pilot misconfiguration that led to the pilot exhausting its allocated runtime. - # We should revisit this issue if we see SIGKILL happening for other cases that are the users' fault. - if exitCode == 137: - raise RecoverableError("SIGKILL; likely an unrelated batch system kill.") - - if exitCode == 10034 or exitCode == 50: - raise RecoverableError("Required application version not found at the site.") - - if exitCode == 10040: - raise RecoverableError("Site Error: failed to generate cmsRun cfg file at runtime.") - - if exitCode == 60403 or exitCode == 243: - raise RecoverableError("Timeout during attempted file stageout.") - - if exitCode == 60307 or exitCode == 147: - raise RecoverableError("Error during attempted file stageout.") - - if exitCode == 60311 or exitCode == 151: - raise RecoverableError("Error during attempted file stageout.") - - if exitCode: - raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) + # # Wrapper script sometimes returns the posix return code (8 bits). + # if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: + # try: # the following is still a bit experimental, make sure it never crashes the PJ + # corruptedInputFile = self.check_corrupted_file(exitCode) + # except Exception as e: # pylint: disable=broad-except + # msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" + # self.logger.error(msg) + # corruptedInputFile = False + # if corruptedInputFile: + # exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" + # self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job + # raise RecoverableError("Job failed to open local and fallback files.") + + # if exitCode == 134: + # recoverable_signal = False + # try: + # fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + # with open(fname, encoding='utf-8') as fd: + # for line in fd: + # if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): + # recoverable_signal = True + # break + # except Exception: # pylint: disable=broad-except + # msg = "Error analyzing abort signal." + # msg += "\nDetails follow:" + # self.logger.exception(msg) + # if recoverable_signal: + # raise RecoverableError("SIGILL; may indicate a worker node issue.") + + # if exitCode == 8001 or exitCode == 65: + # cvmfs_issue = False + # try: + # fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + # cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") + # with open(fname, encoding='utf-8') as fd: + # for line in fd: + # if cvmfs_issue_re.match(line): + # cvmfs_issue = True + # break + # except Exception: # pylint: disable=broad-except + # msg = "Error analyzing output for CVMFS issues." + # msg += "\nDetails follow:" + # self.logger.exception(msg) + # if cvmfs_issue: + # raise RecoverableError("CVMFS issue detected.") + return 0