Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ RUN apk update && \
apk add g++ make cargo rust

RUN apk upgrade -Ua
RUN apk add "libxml2=2.13.4-r6"

RUN pip install --upgrade pip
# Create a non-root user.
Expand Down
200 changes: 158 additions & 42 deletions src/dug/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from dug.config import Config
from dug.core.async_search import Search
from pydantic import BaseModel
from typing import List, Dict, Set, Any
import asyncio
from typing import Optional
from typing import Optional, Any

logger = logging.getLogger (__name__)

Expand Down Expand Up @@ -44,6 +45,12 @@ class SearchVariablesQuery(BaseModel):
offset: int = 0
size: int = 1000

class FilterGrouped(BaseModel):
key: str
value: List[Any]
class SearchVariablesQueryFiltered(SearchVariablesQuery):
filter: List[FilterGrouped] = []

class SearchKgQuery(BaseModel):
query: str
unique_id: str
Expand Down Expand Up @@ -122,65 +129,174 @@ async def search_var(search_query: SearchVariablesQuery):


@APP.post('/search_var_grouped')
async def search_var_grouped(search_query: SearchVariablesQuery):
async def search_var_grouped(search_query: SearchVariablesQueryFiltered):
"""
Searches for variables, groups them by variable ID across studies.
Filters the variables based on provided criteria for the main results list.
Calculates faceted aggregation counts: counts for each category are
calculated based on the dataset as filtered by *all other* active filters.
Returns the filtered variables along with the faceted aggregation counts.
"""

# --- 1. Initial Data Fetching ---
if search_query.query == "":
results = await search.dump_concepts(search_query.index, size=search_query.size )
results = await search.dump_concepts(search_query.index, size=search_query.size)
search_result_hits = results['result']['hits']['hits']
results = search._make_result(None, search_result_hits, {"count": search_query}, False)

else:
results = await search.search_variables(**search_query.dict(exclude={"index"}))
all_elements = []
results = await search.search_variables(**search_query.dict(exclude={"index", "filter"}))

# --- 2. Flattening the Nested Data Structure ---
all_elements: List[Dict[str, Any]] = []
for program_name in filter(lambda x: x != 'total_items', results.keys()):
studies = results[program_name]
for s in studies:
elements = s['elements']
for e in elements:
new_element = e
new_element.update(
{k: v for k, v in s.items() if k != 'elements'}
)
new_element = e.copy()
new_element.update({k: v for k, v in s.items() if k != 'elements'})
new_element['program_name'] = program_name
all_elements.append(new_element)
# regroup by variables
by_id = {}

# --- 3. Grouping Variables by ID ---
by_id: Dict[str, List[Dict[str, Any]]] = {}
for e in all_elements:
by_id[e['id']] = by_id.get(e['id'], [])
by_id[e['id']].append(e)
var_info = None
study_info_keys = [
'c_id', 'c_link', 'c_name', 'program_name'
]
final_variables = []
count_keys = set()
by_id.setdefault(e['id'], []).append(e)

# --- 4. Consolidating Variable Information and Study Associations ---
var_info: Dict[str, Any] | None = None
study_info_keys: List[str] = ['c_id', 'c_link', 'c_name', 'program_name']
# final_variables represents the *unfiltered* set of variables matching the initial query
final_variables: List[Dict[str, Any]] = []
count_keys: Set[str] = set() # Keys suitable for aggregation

for var_id in by_id:
var_studies = by_id[var_id]
first_occurrence = True
for s in var_studies:
if not var_info:
var_info = {
k: v for k, v in s.items() if k not in study_info_keys
}
var_info.update(var_info['metadata'])
for k in var_info['metadata']:
if isinstance(var_info['metadata'][k], str):
count_keys.add(k)
var_info.pop('metadata')
if first_occurrence:
var_info = {k: v for k, v in s.items() if k not in study_info_keys}
if 'metadata' in var_info and isinstance(var_info['metadata'], dict):
metadata = var_info.pop('metadata') # Pop metadata first
var_info.update(metadata) # Then update var_info with its contents
# Identify potential keys for aggregation (string values in metadata)
for k, v in metadata.items(): # Iterate over popped metadata
if isinstance(v, str):
count_keys.add(k)
var_info['studies'] = []
first_occurrence = False

study_data = {k: v for k, v in s.items() if k in study_info_keys}
var_info['studies'].append(study_data)
final_variables.append(var_info)
var_info = None
agg_counts = {}
for var in final_variables:
for key in count_keys:
if key in var:
val = var[key]
agg_counts[key] = agg_counts.get(key , {})
agg_counts[key][val] = agg_counts[key].get(val, 0)
agg_counts[key][val] += 1
if var_info:
var_info['studies'].append(study_data)

if var_info:
final_variables.append(var_info)

# --- 5. Filtering Variables Based on Request Criteria (Helper Function) ---
def filter_variables(vars_to_filter: List[Dict[str, Any]], filters: List[FilterGrouped]) -> List[Dict[str, Any]]:
"""
Filters a list of variables based on a list of filter criteria.
Used both for the final result list and for calculating faceted aggregations.
"""
filtered_vars = vars_to_filter.copy()
for f_group in filters:
to_keep: List[Dict[str, Any]] = []
filter_key_lower = f_group.key.lower()
filter_values_lower = [str(v).lower() for v in f_group.value]

for var in filtered_vars:
if filter_key_lower == "study name":
study_names_in_var = [study['c_name'].lower() for study in var.get('studies', [])]
if any(s_name in filter_values_lower for s_name in study_names_in_var):
to_keep.append(var)
else:
var_keys_lower_map = {key.lower(): key for key in var.keys()}
if filter_key_lower in var_keys_lower_map:
original_key = var_keys_lower_map[filter_key_lower]
# Handle potential non-string values during filtering comparison
var_value_str = str(var.get(original_key, '')) # Get value safely and convert to string
var_value_lower = var_value_str.lower()
if var_value_lower in filter_values_lower:
to_keep.append(var)

filtered_vars = to_keep
if not filtered_vars:
break
return filtered_vars

# --- 6. Calculate Final Filtered List for Response ---
# Apply *all* filters to get the list of variables to return in the response
filtered_variables_for_response = filter_variables(final_variables, filters=search_query.filter)

# --- 7. Calculating Faceted Aggregation Counts ---
agg_counts: Dict[str, Dict[str, int]] = {}
# Define all categories for which we want aggregations
all_aggregation_keys_original_case = count_keys | {"Study Name"} # Use original case for display keys later

# Iterate through each potential aggregation category
for agg_key_orig in all_aggregation_keys_original_case:
agg_key_lower = agg_key_orig.lower()
display_key = agg_key_orig.title() # Key used in the response JSON (e.g., "DataType", "Study Name")

# Determine filters to apply for *this* aggregation calculation
# Exclude the filter matching the current aggregation key (case-insensitive)
filters_for_this_agg = [
f for f in search_query.filter if f.key.lower() != agg_key_lower
]

# Apply these filters to the *original* set of variables (`final_variables`)
temp_filtered_vars = filter_variables(final_variables, filters=filters_for_this_agg)

# Calculate counts for the current aggregation key based on the temp_filtered_vars
current_key_counts: Dict[str, int] = {}
if agg_key_lower == "study name":
# Calculate Study Name counts
for var in temp_filtered_vars:
studies = var.get('studies', [])
for s in studies:
study_name = s.get('c_name', 'Unknown Study')
current_key_counts[study_name] = current_key_counts.get(study_name, 0) + 1
elif agg_key_orig in count_keys:# Check if it's one of the metadata keys
# Calculate counts for other metadata keys
for var in temp_filtered_vars:
# Check if the variable dictionary actually contains this key
if agg_key_orig in var:
# Handle potential non-string values before title-casing
val_raw = var[agg_key_orig]
val_str = str(val_raw) if val_raw is not None else ""
val = val_str.title() # Title-case the string representation
current_key_counts[val] = current_key_counts.get(val, 0) + 1

# Store the calculated counts for this category if there are any counts
if current_key_counts:
agg_counts[display_key] = current_key_counts
# Optionally: else: agg_counts[display_key] = {} # Include the key even if empty

# --- 8. Sorting Aggregation Counts ---
def sort_inner_dicts(data: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]:
"""Sorts the inner dictionaries of the aggregation counts."""
sorted_data = {}
# Sort outer keys alphabetically for consistent order of facets
for outer_key in sorted(data.keys()):
inner_dict = data[outer_key]
if outer_key == "Study Name":
# Sort studies alphabetically by name (key)
sorted_inner = dict(sorted(inner_dict.items(), key=lambda item: item[0]))
else:
# Sort others by count desc, then name asc
sorted_inner = dict(sorted(inner_dict.items(), key=lambda item: (-item[1], item[0])))
sorted_data[outer_key] = sorted_inner
return sorted_data

# Sort the calculated aggregations
sorted_agg_counts = sort_inner_dicts(agg_counts)

# --- 9. Return Final Response ---
# Return the *fully filtered* variables and the *faceted* aggregation counts
return {
"variables": final_variables,
"agg_counts": agg_counts
"variables": filtered_variables_for_response, # Variables filtered by ALL criteria
"agg_counts": sorted_agg_counts # Aggregations calculated facet-style
}


Expand Down
Loading