Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions synda/sdt/sdfilequery.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ def transfer_status_count(status=None,conn=sddb.conn):
c.close()
return count

def transfer_running_count_by_datanode( conn=sddb.conn ):
def transfer_running_count_by_datanode( cached_only=False, conn=sddb.conn ):
"""Returns a dict of elements dn:count where dn is a data_node and count is the number of
status='running' files from that data_node. If cached_only==True, the dict will be limited
to those described in the max-priority cache. This is far faster than a database query will
provide, but will miss any data_nodes which have had no 'waiting' files until recently.
"""
c = conn.cursor()
if Internal().is_processes_get_files_caching:
if cached_only and Internal().is_processes_get_files_caching:
# Get a list of 'waiting' data nodes from the highest-priority cache
dns = [dn for dn in sdfiledao.highest_waiting_priority.vals.keys() if
sdfiledao.highest_waiting_priority.vals[dn] is not None]
Expand Down Expand Up @@ -81,7 +86,7 @@ def get_download_status(project=None):
while rs!=None:
status=rs[0]
count=rs[1]
size=humanize.naturalsize(rs[2],gnu=False)
size=humanize.naturalsize(rs[2],gnu=False,format='%.2f')

li.append([status,count,size])

Expand Down
54 changes: 39 additions & 15 deletions synda/sdt/sdtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,48 +128,56 @@ def transfers_begin():

# how many new transfers can be started:
new_transfer_count = max_transfer - sdfilequery.transfer_running_count()
# datanode_count[datanode], is number of running transfers for a data node:
datanode_count = sdfilequery.transfer_running_count_by_datanode()
# running_datanode_counts[datanode], is number of running transfers for a data node:
running_datanode_counts = sdfilequery.transfer_running_count_by_datanode()
waiting_datanodes = running_datanode_counts.keys()
if new_transfer_count > 0:
transfers_needed = new_transfer_count

for i in range(new_transfer_count):
for datanode in datanode_count.keys():
try:
try:
for datanode in waiting_datanodes:
# Handle per-datanode maximum number of transfers:
try:
new_count = max_datanode_count - datanode_count[datanode]
except KeyError:
# First check for a special max specific to this datanode, e.g. 3 if
# mpdsdd={ "ec.gc.ca":3 } and datanode='crd-esgf-drc.ec.gc.ca'
# We expect mpdsdd to be very short; otherwise performance will be poor.
special_maxes = [ value for key,value in mpdsdd.items() if key in datanode ]
if len(special_maxes)==0:
new_count = max_datanode_count - running_datanode_counts[datanode]
else:
new_count = special_maxes[0] - running_datanode_counts[datanode]
except KeyError: # probably not possible any more
sdlog.info(
"SYNDTASK-189",
"key error on datanode {}, legal keys are {}".format(
datanode,
datanode_count.keys(),
running_datanode_counts.keys(),
),
)
new_count = max_datanode_count
if new_count <= 0:
continue

tr = sddao.get_one_waiting_transfer(datanode)

prepare_transfer(tr)

if pre_transfer_check_list(tr):
sdfiledao.update_file(tr)
transfers.append(tr)

if datanode in datanode_count:
datanode_count[datanode] += 1
else:
datanode_count[datanode] = 1

running_datanode_counts[datanode] += 1
transfers_needed -= 1
if transfers_needed <= 0:
break
except NoTransferWaitingException, e:
pass

except NoTransferWaitingException, e:
break
if transfers_needed <= 0:
break

sdlog.info("SYNDTASK-190","ready to call transfers_begin on %s transfers"%
len(transfers) )
dmngr.transfers_begin(transfers)


Expand All @@ -192,6 +200,22 @@ def fatal_exception():
max_transfer = preferences.download_max_parallel_download

max_datanode_count = preferences.download_max_parallel_download_per_datanode

mpdsd = preferences.download_max_parallel_download_special_datanodes
#...e.g. "crd-esgf-drc:3, tropmet:1"
if mpdsd=='':
mpdsdd = {}
else:
try:
mpdsdd = eval('{"'+mpdsd.replace(' ','').replace(':','":').replace(',',',"')+'}')
#...e.g. {"crd-esgf-drc":3, "tropmet":1}
except:
mpdsdd = {}
sdlog.warning("SYNDTASK-250","trouble parsing max_parallel_download_special_datanodes=%s"
% mpdsd )
#...e.g. {"crd-esgf-drc}:3, "tropmet":1}


lfae_mode = preferences.behaviour_lfae_mode

dmngr = get_download_manager()
1 change: 1 addition & 0 deletions synda/source/config/file/user/preferences/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DEFAULT_OPTIONS = dict(
max_parallel_download="8",
max_parallel_download_per_datanode="8",
max_parallel_download_special_datanodes="",
get_only_latest_version='true',
user='',
group='',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(self, full_filename):
config.add_section('download')
config.set('download', 'max_parallel_download', '8')
# config.set('download', 'max_parallel_download_per_datanode', '8')
# config.set('download', 'max_parallel_download_special_datanodes', '')
# config.set('download', 'get_only_latest_version', 'true')
config.set('download', 'hpss', '1')
config.set('download', 'http_fallback', 'false')
Expand Down
4 changes: 4 additions & 0 deletions synda/source/config/file/user/preferences/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ def download_max_parallel_download(self):
def download_max_parallel_download_per_datanode(self):
return self.get_data().getint('download', 'max_parallel_download_per_datanode')

@property
def download_max_parallel_download_special_datanodes(self):
return self.get_data().get('download', 'max_parallel_download_special_datanodes')

@property
def download_url_max_buffer_size(self):
return self.get_data().getint('download', 'url_max_buffer_size')
Expand Down