Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ckanext/activityinfo/assets/js/activityinfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
// Use the full download URL from the response
const downloadUrl = data.download_url;
if (downloadUrl) {
// Open in new tab to trigger download from ActivityInfo
// Open in new tab to trigger download via proxy endpoint
window.open(downloadUrl, '_blank');
showAlert('Download ready for "' + formLabel + '"', 'success');
} else {
Expand Down
67 changes: 61 additions & 6 deletions ckanext/activityinfo/blueprints/activity_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from flask import Blueprint
from flask import Blueprint, Response
from ckan.common import current_user
from ckan.plugins import toolkit
from ckan.views.api import _finish_ok
Expand Down Expand Up @@ -214,18 +214,73 @@ def job_status(job_id):

log.info(f"Job status for {job_id}: {job_status}")

# Build full download URL if job is completed
full_download_url = None
# Build proxy download URL if job is completed
proxy_download_url = None
if job_status.get('state') == 'completed':
result = job_status.get('result', {})
relative_url = result.get('downloadUrl', '')
if relative_url:
aic = ActivityInfoClient()
full_download_url = f"{aic.base_url}/{relative_url.lstrip('/')}"
proxy_download_url = toolkit.url_for('activity_info.download_file', job_id=job_id)

ret = {
'success': True,
'result': job_status,
'download_url': full_download_url,
'download_url': proxy_download_url,
}
return _finish_ok(ret)


@activityinfo_bp.route('/download-file/<job_id>')
def download_file(job_id):
"""Proxy download endpoint that fetches the file server-side.

This avoids the 307 redirect issue where ActivityInfo redirects to
a GCS signed URL and the browser forwards the Authorization header,
causing GCS to reject the request.
"""
token = get_user_token(current_user.name)
if not token:
return toolkit.abort(403, "No ActivityInfo API key configured")

client = ActivityInfoClient(api_key=token)

try:
job_data = client.get_job_status(job_id)
except Exception as e:
log.error(f"Failed to get job status for {job_id}: {e}")
return toolkit.abort(500, f"Failed to get export job status: {e}")

if job_data.get('state') != 'completed':
state = job_data.get('state', 'unknown')
return toolkit.abort(400, f"Export job is not completed (state: {state})")

result = job_data.get('result', {})
download_url = result.get('downloadUrl', '')
if not download_url:
return toolkit.abort(500, "Export completed but no download URL provided")

if not download_url.startswith('http'):
download_url = f"{client.base_url}/{download_url.lstrip('/')}"

try:
file_content = client.download_file(download_url)
except Exception as e:
log.error(f"Failed to download file for job {job_id}: {e}")
return toolkit.abort(502, f"Failed to download file from ActivityInfo: {e}")

# Determine content type and filename from the URL
if download_url.endswith('.xlsx'):
content_type = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
filename = f"export-{job_id}.xlsx"
elif download_url.endswith('.txt'):
content_type = 'text/plain'
filename = f"export-{job_id}.txt"
else:
content_type = 'text/csv'
filename = f"export-{job_id}.csv"

return Response(
file_content,
mimetype=content_type,
headers={'Content-Disposition': f'attachment; filename="{filename}"'}
)
37 changes: 31 additions & 6 deletions ckanext/activityinfo/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,35 @@ def get_job_file(self, job_id):
url = f"{self.base_url}/{endpoint}"
return True, url

def _follow_redirect_without_auth(self, url):
"""Make an authenticated request and follow any redirect without auth headers.

ActivityInfo returns 307 redirects to Google Cloud Storage signed URLs.
GCS does not expect an Authorization header alongside its signed URL,
so we must strip the auth header when following the redirect.

Args:
url (str): The URL to request.
Returns:
The response object from the final request.
"""
headers = self.get_user_auth_headers()
response = requests.get(url, headers=headers, allow_redirects=False)
log.debug(f"Initial response status: {response.status_code} for {url}")

if response.status_code in (301, 302, 303, 307, 308):
redirect_url = response.headers.get('Location')
log.info(f"Following redirect to {redirect_url} without auth headers")
response = requests.get(redirect_url)

response.raise_for_status()
log.debug(f"Downloaded {len(response.content)} bytes from {url}")

if not response.content:
raise ValueError(f"Downloaded file is empty from {url}")

return response

def download_finished_export(self, download_url):
"""
Download the finished export file from ActivityInfo.
Expand All @@ -232,9 +261,7 @@ def download_finished_export(self, download_url):
Returns:
The content of the downloaded file.
"""
headers = self.get_user_auth_headers()
response = requests.get(download_url, headers=headers)
response.raise_for_status()
response = self._follow_redirect_without_auth(download_url)
return response.content

def download_file(self, url: str) -> bytes:
Expand All @@ -246,7 +273,5 @@ def download_file(self, url: str) -> bytes:
Returns:
The file contents as bytes
"""
headers = {'Authorization': f'Bearer {self.api_key}'}
response = requests.get(url, headers=headers)
response.raise_for_status()
response = self._follow_redirect_without_auth(url)
return response.content
129 changes: 103 additions & 26 deletions ckanext/activityinfo/jobs/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ def download_activityinfo_resource(resource_id: str, user: str) -> None:

context = {'user': user}

try:
_do_download(context, resource_id, user)
except Exception:
log.exception(f"ActivityInfo Job: Unhandled error for resource {resource_id}")
try:
_update_resource_status(
{'user': user}, resource_id, 'error', 0,
'An unexpected error occurred during the ActivityInfo download'
)
except Exception:
log.exception(
f"ActivityInfo Job: Failed to update error status for resource {resource_id}"
)
raise


def _do_download(context: dict, resource_id: str, user: str) -> None:
"""Inner logic for downloading ActivityInfo data.

Separated from the top-level function to keep cyclomatic complexity manageable.
"""
resource = toolkit.get_action('resource_show')(context, {'id': resource_id})

form_id = resource.get('activityinfo_form_id')
Expand All @@ -40,70 +61,126 @@ def download_activityinfo_resource(resource_id: str, user: str) -> None:

token = get_user_token(user)
if not token:
_update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'No API key configured')
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 0, 'No API key configured'
)
raise ValueError("No ActivityInfo API key configured for user")

client = ActivityInfoClient(api_key=token)

job_id = _start_export_job(client, context, resource_id, form_id, format_type)
download_url = _poll_for_completion(client, context, resource_id, job_id)

file_data = _download_export_file(client, context, resource_id, download_url)

# Generate filename
safe_label = "".join(c if c.isalnum() or c in '-_ ' else '_' for c in form_label)
filename = f"{safe_label}.{format_type}"

# Save to temp file and update resource
_update_resource_with_file(
toolkit.fresh_context(context), resource_id, file_data, filename, format_type
)

log.info(f"ActivityInfo Job: Successfully updated resource {resource_id}")


def _start_export_job(client, context, resource_id, form_id, format_type):
"""Start an ActivityInfo export job and return the job ID."""
log.info(f"ActivityInfo Job: Starting export for form {form_id}")
job_info = client.start_job_download_form_data(form_id, format=format_type.upper())
job_id = job_info.get('id') or job_info.get('jobId')
try:
job_info = client.start_job_download_form_data(form_id, format=format_type.upper())
except Exception as e:
error_msg = f"Failed to start ActivityInfo export: {e}"
log.error(error_msg)
_update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, error_msg)
raise

job_id = job_info.get('id') or job_info.get('jobId')
if not job_id:
_update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'Failed to start export job')
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 0, 'Failed to start export job'
)
raise ValueError("Failed to start ActivityInfo export job")

log.debug(f"ActivityInfo Job: Export job started with ID {job_id}")
return job_id


# Poll for job completion
def _poll_for_completion(client, context, resource_id, job_id):
"""Poll ActivityInfo for job completion and return the download URL."""
max_wait = 300 # 5 minutes max
poll_interval = 3
elapsed = 0

while elapsed < max_wait:
status = client.get_job_status(job_id)
try:
status = client.get_job_status(job_id)
except Exception as e:
error_msg = f"Failed to get export job status: {e}"
log.error(error_msg)
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 0, error_msg
)
raise

state = status.get('state')
percent = status.get('percentComplete', 0)
# Update progress
_update_resource_status(toolkit.fresh_context(context), resource_id, 'exporting', percent)

if state == 'completed':
result = status.get('result', {})
download_url = result.get('downloadUrl') if isinstance(result, dict) else None
if not download_url:
raise ValueError("Export completed but no download URL provided")

log.info(f"ActivityInfo Job: Export completed, downloading from {download_url}")
_update_resource_status(toolkit.fresh_context(context), resource_id, 'downloading', 100)

# Download the file
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'downloading', 100
)
if not download_url.startswith('http'):
download_url = f"{client.base_url}/{download_url.lstrip('/')}"
return download_url

file_data = client.download_file(download_url)

# Generate filename
safe_label = "".join(c if c.isalnum() or c in '-_ ' else '_' for c in form_label)
filename = f"{safe_label}.{format_type}"

# Save to temp file and update resource
_update_resource_with_file(toolkit.fresh_context(context), resource_id, file_data, filename, format_type)

log.info(f"ActivityInfo Job: Successfully updated resource {resource_id}")
return

elif state == 'failed':
if state == 'failed':
error = status.get('error', 'Unknown error')
_update_resource_status(toolkit.fresh_context(context), resource_id, 'error', percent, error)
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', percent, error
)
raise ValueError(f"ActivityInfo export job failed: {error}")

time.sleep(poll_interval)
elapsed += poll_interval

_update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'Timeout waiting for export job to complete')
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 0,
'Timeout waiting for export job to complete'
)
raise ValueError(f"ActivityInfo export job timed out after {max_wait} seconds")


def _download_export_file(client, context, resource_id, download_url):
"""Download the exported file and validate it is not empty."""
try:
file_data = client.download_file(download_url)
except Exception as e:
error_msg = f"Failed to download file from ActivityInfo: {e}"
log.error(error_msg)
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 100, error_msg
)
raise

if not file_data:
error_msg = "Downloaded file is empty"
log.error(error_msg)
_update_resource_status(
toolkit.fresh_context(context), resource_id, 'error', 100, error_msg
)
raise ValueError(error_msg)

return file_data


def _update_resource_status(context: dict, resource_id: str, status: str,
progress: int, error: str = '') -> None:
"""Update the ActivityInfo status fields on a resource."""
Expand Down
Loading