Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\""
VARS Job{count} My.CRAB_DataBlock="\\"{block}\\""
VARS Job{count} My.CRAB_Destination="\\"{destination}\\""
ABORT-DAG-ON Job{count} 3
"""


Expand Down Expand Up @@ -346,7 +345,7 @@ def makeJobSubmit(self):
jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0"
jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url'])
jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url'])

jobSubmit['My.CRAB_ResubmitCounter'] = "0"

# note about Lists
# in the JDL everything is a string, we can't use the simple classAd[name]=somelist
Expand Down Expand Up @@ -885,7 +884,7 @@ def createSubdag(self, splitterResult, **kwargs):

## In the future this parameter may be set by the user in the CRAB configuration
## file and we would take it from the Task DB.
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2)
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 9)

runtime = self.task['tm_split_args'].get('minutes_per_job', -1)

Expand Down
5 changes: 5 additions & 0 deletions src/python/TaskWorker/Actions/DagmanResubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument
else:
newAdValue = str(task['resubmit_'+taskparam])
schedd.edit(rootConst, adparam, newAdValue)
dagAds = schedd.query(rootConst, ['CRAB_ResubmitCounter'])
current = 0
if dagAds and isinstance(dagAds[0].get('CRAB_ResubmitCounter'), int):
current = dagAds[0]['CRAB_ResubmitCounter']
schedd.edit(rootConst, "CRAB_ResubmitCounter", str(current + 1))
# finally restart the dagman with the 3 lines below
schedd.act(htcondor.JobAction.Hold, rootConst)
schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1')
Expand Down
1 change: 1 addition & 0 deletions src/python/TaskWorker/Actions/DagmanSubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit):
'My.CMS_Type',
'My.CMS_WMTool',
'My.CMS_TaskType',
'My.CRAB_ResubmitCounter',
]

for adName in adsToPort:
Expand Down
71 changes: 62 additions & 9 deletions src/python/TaskWorker/Actions/PreJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ def alter_submit(self, crab_retry):
maxmemory = None
numcores = None
priority = None
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}")
if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit
#if 'MaxWallTimeMins_RAW' in self.task_ad:
# if self.task_ad['MaxWallTimeMins_RAW'] != 1315:
Expand All @@ -291,15 +295,33 @@ def alter_submit(self, crab_retry):
priority = 20 #the maximum for splitting jobs
else: # means we resubmit with same params as previous try
## SB most likely much (all) of this string/int conversions can be simplified
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime')
maxmemory = self.resubmit_info[inkey].get('maxmemory')
numcores = self.resubmit_info[inkey].get('numcores')
priority = self.resubmit_info[inkey].get('priority')

#ExitCode Dependent change in resubmission parameters for retries

if self.resubmit_info:
retry_data = self.resubmit_info.get(inkey, {})
if retry_data.get("increase_memory") and maxmemory:
factor = retry_data.get("memory_factor", 1.2)
new_memory = int(int(maxmemory) * factor)
if hasattr(self, "MAX_MEMORY"):
new_memory = min(new_memory, self.MAX_MEMORY)
self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}")
maxmemory = new_memory

if retry_data.get("increase_runtime") and maxjobruntime:
factor = retry_data.get("runtime_factor", 1.2)
new_runtime = int(int(maxjobruntime) * factor)
self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}")
maxjobruntime = new_runtime

## Save the (new) values of the resubmission parameters in self.resubmit_info
## for the current job retry number.
#get resubmission counter
resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0))
outkey = str(crab_retry)
if outkey not in self.resubmit_info:
self.resubmit_info[outkey] = {}
Expand All @@ -309,7 +331,7 @@ def alter_submit(self, crab_retry):
self.resubmit_info[outkey]['priority'] = priority
self.resubmit_info[outkey]['use_resubmit_info'] = use_resubmit_info
self.resubmit_info[outkey]['CRAB_ResubmitList_in_taskad'] = CRAB_ResubmitList_in_taskad

self.resubmit_info[outkey]["resubmit_counter"] = resubmit_counter
## Add the resubmission parameters to the Job.<job_id>.submit content.
##
if self.stage == "probe":
Expand Down Expand Up @@ -373,6 +395,9 @@ def redoSites(self, crab_retry, use_resubmit_info):
siteWhiteList = []
siteBlackSet = set()
siteWhiteSet = set()
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
if not use_resubmit_info:
if 'CRAB_SiteBlacklist' in self.task_ad:
if self.task_ad['CRAB_SiteBlacklist']: # skip ad=''
Expand All @@ -383,9 +408,6 @@ def redoSites(self, crab_retry, use_resubmit_info):
siteWhiteList = self.task_ad['CRAB_SiteWhitelist']
siteWhiteSet = set(siteWhiteList)
else:
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
siteBlackSet = set(self.resubmit_info[inkey].get('site_blacklist', []))
siteWhiteSet = set(self.resubmit_info[inkey].get('site_whitelist', []))
## Save the current site black- and whitelists in self.resubmit_info for the
Expand Down Expand Up @@ -414,6 +436,18 @@ def redoSites(self, crab_retry, use_resubmit_info):
self.logger.error("Can not submit since DESIRED_Sites list is empty")
self.prejob_exit_code = 1
sys.exit(self.prejob_exit_code)

# ExitCode Dependent discard of previous_site
retry_data = self.resubmit_info.get(inkey, {})
previous_site = retry_data.get("previous_site")
if retry_data.get("change_site") and previous_site:
self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}")
if previous_site in availableSet:
self.logger.info(f"Removing previous site {previous_site} from candidate sites")
availableSet.discard(previous_site)
else:
self.logger.info(f"No need to discard last site. inkey was {inkey}")

## Make sure that attributest which will be used in MatchMaking are SORTED lists
available = list(availableSet)
available.sort()
Expand Down Expand Up @@ -456,19 +490,38 @@ def needsDefer(self):
slow release of jobs in a task.
The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit
time of the task plus the defer time is greater than the current time.
Additionally retry policy delay
"""
deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0))

currentTime = time.time()

# Check retry delay from resubmit_info
retry_info_file = f"resubmit_info/job.{self.job_id}.txt"
if os.path.exists(retry_info_file):
try:
with open(retry_info_file, "r", encoding="utf-8") as fd:
retry_info = literal_eval(fd.read())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time to make it a JSON file ?

key = str(self.dag_retry)
if key in retry_info:
retry_delay_until = retry_info[key].get("retry_delay_until")
if retry_delay_until and currentTime < retry_delay_until:
wait = int(retry_delay_until - currentTime)
self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.")
return True
except Exception:
self.logger.exception("Error checking retry delay in resubmit_info")

if deferTime:
self.logger.info('Release timeout specified in extraJDL:')
totalDefer = deferTime * int(self.job_id)
submitTime = int(self.task_ad["CRAB_TaskSubmitTime"])
currentTime = time.time()
if currentTime < (submitTime + totalDefer):
msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)"
msg += f" not elapsed yet, deferring for {totalDefer} seconds"
self.logger.info(msg)
return True
self.logger.info(' Continuing normally since current time is greater than requested starttime of the job')
self.logger.info('Continuing normally since current time is greater than requested starttime of the job')
return False

def execute(self, *args):
Expand Down
Loading