diff --git a/ckanext/activityinfo/assets/js/activityinfo.js b/ckanext/activityinfo/assets/js/activityinfo.js index 7850fe6..928d2b9 100644 --- a/ckanext/activityinfo/assets/js/activityinfo.js +++ b/ckanext/activityinfo/assets/js/activityinfo.js @@ -74,7 +74,7 @@ // Use the full download URL from the response const downloadUrl = data.download_url; if (downloadUrl) { - // Open in new tab to trigger download from ActivityInfo + // Open in new tab to trigger download via proxy endpoint window.open(downloadUrl, '_blank'); showAlert('Download ready for "' + formLabel + '"', 'success'); } else { diff --git a/ckanext/activityinfo/blueprints/activity_info.py b/ckanext/activityinfo/blueprints/activity_info.py index dfa6533..0acc5da 100644 --- a/ckanext/activityinfo/blueprints/activity_info.py +++ b/ckanext/activityinfo/blueprints/activity_info.py @@ -1,5 +1,5 @@ import logging -from flask import Blueprint +from flask import Blueprint, Response from ckan.common import current_user from ckan.plugins import toolkit from ckan.views.api import _finish_ok @@ -214,18 +214,73 @@ def job_status(job_id): log.info(f"Job status for {job_id}: {job_status}") - # Build full download URL if job is completed - full_download_url = None + # Build proxy download URL if job is completed + proxy_download_url = None if job_status.get('state') == 'completed': result = job_status.get('result', {}) relative_url = result.get('downloadUrl', '') if relative_url: - aic = ActivityInfoClient() - full_download_url = f"{aic.base_url}/{relative_url.lstrip('/')}" + proxy_download_url = toolkit.url_for('activity_info.download_file', job_id=job_id) ret = { 'success': True, 'result': job_status, - 'download_url': full_download_url, + 'download_url': proxy_download_url, } return _finish_ok(ret) + + +@activityinfo_bp.route('/download-file/') +def download_file(job_id): + """Proxy download endpoint that fetches the file server-side. + + This avoids the 307 redirect issue where ActivityInfo redirects to + a GCS signed URL and the browser forwards the Authorization header, + causing GCS to reject the request. + """ + token = get_user_token(current_user.name) + if not token: + return toolkit.abort(403, "No ActivityInfo API key configured") + + client = ActivityInfoClient(api_key=token) + + try: + job_data = client.get_job_status(job_id) + except Exception as e: + log.error(f"Failed to get job status for {job_id}: {e}") + return toolkit.abort(500, f"Failed to get export job status: {e}") + + if job_data.get('state') != 'completed': + state = job_data.get('state', 'unknown') + return toolkit.abort(400, f"Export job is not completed (state: {state})") + + result = job_data.get('result', {}) + download_url = result.get('downloadUrl', '') + if not download_url: + return toolkit.abort(500, "Export completed but no download URL provided") + + if not download_url.startswith('http'): + download_url = f"{client.base_url}/{download_url.lstrip('/')}" + + try: + file_content = client.download_file(download_url) + except Exception as e: + log.error(f"Failed to download file for job {job_id}: {e}") + return toolkit.abort(502, f"Failed to download file from ActivityInfo: {e}") + + # Determine content type and filename from the URL + if download_url.endswith('.xlsx'): + content_type = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + filename = f"export-{job_id}.xlsx" + elif download_url.endswith('.txt'): + content_type = 'text/plain' + filename = f"export-{job_id}.txt" + else: + content_type = 'text/csv' + filename = f"export-{job_id}.csv" + + return Response( + file_content, + mimetype=content_type, + headers={'Content-Disposition': f'attachment; filename="{filename}"'} + ) diff --git a/ckanext/activityinfo/data/base.py b/ckanext/activityinfo/data/base.py index 0c09036..29ddcb3 100644 --- a/ckanext/activityinfo/data/base.py +++ b/ckanext/activityinfo/data/base.py @@ -224,6 +224,35 @@ def get_job_file(self, job_id): url = f"{self.base_url}/{endpoint}" return True, url + def _follow_redirect_without_auth(self, url): + """Make an authenticated request and follow any redirect without auth headers. + + ActivityInfo returns 307 redirects to Google Cloud Storage signed URLs. + GCS does not expect an Authorization header alongside its signed URL, + so we must strip the auth header when following the redirect. + + Args: + url (str): The URL to request. + Returns: + The response object from the final request. + """ + headers = self.get_user_auth_headers() + response = requests.get(url, headers=headers, allow_redirects=False) + log.debug(f"Initial response status: {response.status_code} for {url}") + + if response.status_code in (301, 302, 303, 307, 308): + redirect_url = response.headers.get('Location') + log.info(f"Following redirect to {redirect_url} without auth headers") + response = requests.get(redirect_url) + + response.raise_for_status() + log.debug(f"Downloaded {len(response.content)} bytes from {url}") + + if not response.content: + raise ValueError(f"Downloaded file is empty from {url}") + + return response + def download_finished_export(self, download_url): """ Download the finished export file from ActivityInfo. @@ -232,9 +261,7 @@ def download_finished_export(self, download_url): Returns: The content of the downloaded file. """ - headers = self.get_user_auth_headers() - response = requests.get(download_url, headers=headers) - response.raise_for_status() + response = self._follow_redirect_without_auth(download_url) return response.content def download_file(self, url: str) -> bytes: @@ -246,7 +273,5 @@ def download_file(self, url: str) -> bytes: Returns: The file contents as bytes """ - headers = {'Authorization': f'Bearer {self.api_key}'} - response = requests.get(url, headers=headers) - response.raise_for_status() + response = self._follow_redirect_without_auth(url) return response.content diff --git a/ckanext/activityinfo/jobs/download.py b/ckanext/activityinfo/jobs/download.py index 816d8ef..6492c35 100644 --- a/ckanext/activityinfo/jobs/download.py +++ b/ckanext/activityinfo/jobs/download.py @@ -27,6 +27,27 @@ def download_activityinfo_resource(resource_id: str, user: str) -> None: context = {'user': user} + try: + _do_download(context, resource_id, user) + except Exception: + log.exception(f"ActivityInfo Job: Unhandled error for resource {resource_id}") + try: + _update_resource_status( + {'user': user}, resource_id, 'error', 0, + 'An unexpected error occurred during the ActivityInfo download' + ) + except Exception: + log.exception( + f"ActivityInfo Job: Failed to update error status for resource {resource_id}" + ) + raise + + +def _do_download(context: dict, resource_id: str, user: str) -> None: + """Inner logic for downloading ActivityInfo data. + + Separated from the top-level function to keep cyclomatic complexity manageable. + """ resource = toolkit.get_action('resource_show')(context, {'id': resource_id}) form_id = resource.get('activityinfo_form_id') @@ -40,31 +61,71 @@ def download_activityinfo_resource(resource_id: str, user: str) -> None: token = get_user_token(user) if not token: - _update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'No API key configured') + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 0, 'No API key configured' + ) raise ValueError("No ActivityInfo API key configured for user") client = ActivityInfoClient(api_key=token) + job_id = _start_export_job(client, context, resource_id, form_id, format_type) + download_url = _poll_for_completion(client, context, resource_id, job_id) + + file_data = _download_export_file(client, context, resource_id, download_url) + + # Generate filename + safe_label = "".join(c if c.isalnum() or c in '-_ ' else '_' for c in form_label) + filename = f"{safe_label}.{format_type}" + + # Save to temp file and update resource + _update_resource_with_file( + toolkit.fresh_context(context), resource_id, file_data, filename, format_type + ) + + log.info(f"ActivityInfo Job: Successfully updated resource {resource_id}") + + +def _start_export_job(client, context, resource_id, form_id, format_type): + """Start an ActivityInfo export job and return the job ID.""" log.info(f"ActivityInfo Job: Starting export for form {form_id}") - job_info = client.start_job_download_form_data(form_id, format=format_type.upper()) - job_id = job_info.get('id') or job_info.get('jobId') + try: + job_info = client.start_job_download_form_data(form_id, format=format_type.upper()) + except Exception as e: + error_msg = f"Failed to start ActivityInfo export: {e}" + log.error(error_msg) + _update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, error_msg) + raise + job_id = job_info.get('id') or job_info.get('jobId') if not job_id: - _update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'Failed to start export job') + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 0, 'Failed to start export job' + ) raise ValueError("Failed to start ActivityInfo export job") log.debug(f"ActivityInfo Job: Export job started with ID {job_id}") + return job_id + - # Poll for job completion +def _poll_for_completion(client, context, resource_id, job_id): + """Poll ActivityInfo for job completion and return the download URL.""" max_wait = 300 # 5 minutes max poll_interval = 3 elapsed = 0 while elapsed < max_wait: - status = client.get_job_status(job_id) + try: + status = client.get_job_status(job_id) + except Exception as e: + error_msg = f"Failed to get export job status: {e}" + log.error(error_msg) + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 0, error_msg + ) + raise + state = status.get('state') percent = status.get('percentComplete', 0) - # Update progress _update_resource_status(toolkit.fresh_context(context), resource_id, 'exporting', percent) if state == 'completed': @@ -72,38 +133,54 @@ def download_activityinfo_resource(resource_id: str, user: str) -> None: download_url = result.get('downloadUrl') if isinstance(result, dict) else None if not download_url: raise ValueError("Export completed but no download URL provided") - log.info(f"ActivityInfo Job: Export completed, downloading from {download_url}") - _update_resource_status(toolkit.fresh_context(context), resource_id, 'downloading', 100) - - # Download the file + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'downloading', 100 + ) if not download_url.startswith('http'): download_url = f"{client.base_url}/{download_url.lstrip('/')}" + return download_url - file_data = client.download_file(download_url) - - # Generate filename - safe_label = "".join(c if c.isalnum() or c in '-_ ' else '_' for c in form_label) - filename = f"{safe_label}.{format_type}" - - # Save to temp file and update resource - _update_resource_with_file(toolkit.fresh_context(context), resource_id, file_data, filename, format_type) - - log.info(f"ActivityInfo Job: Successfully updated resource {resource_id}") - return - - elif state == 'failed': + if state == 'failed': error = status.get('error', 'Unknown error') - _update_resource_status(toolkit.fresh_context(context), resource_id, 'error', percent, error) + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', percent, error + ) raise ValueError(f"ActivityInfo export job failed: {error}") time.sleep(poll_interval) elapsed += poll_interval - _update_resource_status(toolkit.fresh_context(context), resource_id, 'error', 0, 'Timeout waiting for export job to complete') + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 0, + 'Timeout waiting for export job to complete' + ) raise ValueError(f"ActivityInfo export job timed out after {max_wait} seconds") +def _download_export_file(client, context, resource_id, download_url): + """Download the exported file and validate it is not empty.""" + try: + file_data = client.download_file(download_url) + except Exception as e: + error_msg = f"Failed to download file from ActivityInfo: {e}" + log.error(error_msg) + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 100, error_msg + ) + raise + + if not file_data: + error_msg = "Downloaded file is empty" + log.error(error_msg) + _update_resource_status( + toolkit.fresh_context(context), resource_id, 'error', 100, error_msg + ) + raise ValueError(error_msg) + + return file_data + + def _update_resource_status(context: dict, resource_id: str, status: str, progress: int, error: str = '') -> None: """Update the ActivityInfo status fields on a resource.""" diff --git a/ckanext/activityinfo/tests/test_activityinfo_client.py b/ckanext/activityinfo/tests/test_activityinfo_client.py index f958d6a..1acac2a 100644 --- a/ckanext/activityinfo/tests/test_activityinfo_client.py +++ b/ckanext/activityinfo/tests/test_activityinfo_client.py @@ -373,3 +373,92 @@ def test_get_job_file_completed(requests_mock_fixture, client): assert done is True assert result == "https://www.activityinfo.org/resources/jobs/job123/download" + + +class MockResponse: + """Mock HTTP response for testing download_file methods.""" + def __init__(self, status_code=200, content=b'', headers=None): + self.status_code = status_code + self.content = content + self.text = content.decode('utf-8') if isinstance(content, bytes) else str(content) + self.headers = headers or {} + self.encoding = 'utf-8' + self.url = '' + self.reason = 'OK' if status_code == 200 else 'Temporary Redirect' + + def raise_for_status(self): + if self.status_code >= 400: + raise requests.HTTPError(f"{self.status_code} Error", response=self) + + def json(self): + import json + return json.loads(self.text) if self.text else {} + + +def test_download_file_follows_redirect_without_auth(monkeypatch): + """Test that download_file follows 307 redirects without sending auth headers.""" + calls = [] + original_get = requests.get + + def fake_get(url, headers=None, allow_redirects=None, **kwargs): + # Only mock specific test URLs + if 'activityinfo.org' in url or 'storage.googleapis.com' in url: + calls.append({'url': url, 'headers': headers, 'allow_redirects': allow_redirects}) + if 'activityinfo.org' in url: + return MockResponse( + 307, + b'', + {'Location': 'https://storage.googleapis.com/signed-url'} + ) + return MockResponse(200, b'file-content-here') + # Pass through all other requests (e.g., Solr checks) + return original_get(url, headers=headers, allow_redirects=allow_redirects, **kwargs) + + monkeypatch.setattr(requests, "get", fake_get) + + test_client = ActivityInfoClient(api_key="test-api-key", debug=False) + result = test_client.download_file("https://www.activityinfo.org/resources/jobs/job123/download") + + assert result == b'file-content-here' + # First call should have auth headers and allow_redirects=False + assert calls[0]['headers']['Authorization'] == 'Bearer test-api-key' + assert calls[0]['allow_redirects'] is False + # Second call (to GCS) should NOT have auth headers + assert calls[1]['headers'] is None + assert calls[1]['url'] == 'https://storage.googleapis.com/signed-url' + + +def test_download_file_no_redirect(monkeypatch): + """Test that download_file works when there is no redirect.""" + original_get = requests.get + + def fake_get(url, headers=None, allow_redirects=None, **kwargs): + # Only mock specific test URLs + if 'activityinfo.org' in url: + return MockResponse(200, b'direct-content') + # Pass through all other requests (e.g., Solr checks) + return original_get(url, headers=headers, allow_redirects=allow_redirects, **kwargs) + + monkeypatch.setattr(requests, "get", fake_get) + + test_client = ActivityInfoClient(api_key="test-api-key", debug=False) + result = test_client.download_file("https://www.activityinfo.org/resources/jobs/job123/download") + assert result == b'direct-content' + + +def test_download_file_empty_raises(monkeypatch): + """Test that download_file raises when downloaded content is empty.""" + original_get = requests.get + + def fake_get(url, headers=None, allow_redirects=None, **kwargs): + # Only mock specific test URLs + if 'activityinfo.org' in url: + return MockResponse(200, b'') + # Pass through all other requests (e.g., Solr checks) + return original_get(url, headers=headers, allow_redirects=allow_redirects, **kwargs) + + monkeypatch.setattr(requests, "get", fake_get) + + test_client = ActivityInfoClient(api_key="test-api-key", debug=False) + with pytest.raises(ValueError, match="empty"): + test_client.download_file("https://www.activityinfo.org/resources/jobs/job123/download") diff --git a/ckanext/activityinfo/tests/test_download_endpoints.py b/ckanext/activityinfo/tests/test_download_endpoints.py index fd1ab98..216d934 100644 --- a/ckanext/activityinfo/tests/test_download_endpoints.py +++ b/ckanext/activityinfo/tests/test_download_endpoints.py @@ -95,7 +95,7 @@ def test_job_status_completed(self, app, setup_data): data = resp.json assert data["success"] is True assert data["result"]["state"] == "completed" - assert data["download_url"] == "https://www.activityinfo.org/resources/jobs/job_abc123/download" + assert data["download_url"] == "/activity-info/download-file/job_abc123" def test_job_status_failed(self, app, setup_data): """Test job status endpoint when job has failed""" @@ -120,6 +120,83 @@ def test_job_status_failed(self, app, setup_data): assert data["result"]["state"] == "failed" assert data["download_url"] is None + def test_download_file_proxy_completed(self, app, setup_data): + """Test proxy download endpoint returns file when job is completed""" + environ = {"Authorization": setup_data.activityinfo_user["token"]} + job_id = "job_abc123" + + fake_status = { + "id": job_id, + "state": "completed", + "percentComplete": 100, + "result": { + "downloadUrl": "/resources/jobs/job_abc123/download" + } + } + + with mock.patch( + "ckanext.activityinfo.data.base.ActivityInfoClient.get_job_status", + return_value=fake_status, + ), mock.patch( + "ckanext.activityinfo.data.base.ActivityInfoClient.download_file", + return_value=b"col1,col2\nval1,val2\n", + ): + resp = app.get(f"/activity-info/download-file/{job_id}", headers=environ) + + assert resp.status_code == 200 + assert resp.content_type.startswith("text/csv") + assert b"col1,col2" in resp.data + + def test_download_file_proxy_not_completed(self, app, setup_data): + """Test proxy download endpoint returns 400 when job is not completed""" + environ = {"Authorization": setup_data.activityinfo_user["token"]} + job_id = "job_abc123" + + fake_status = { + "id": job_id, + "state": "started", + "percentComplete": 50 + } + + with mock.patch( + "ckanext.activityinfo.data.base.ActivityInfoClient.get_job_status", + return_value=fake_status, + ): + resp = app.get( + f"/activity-info/download-file/{job_id}", + headers=environ, + expect_errors=True + ) + assert resp.status_code == 400 + + def test_download_file_proxy_download_error(self, app, setup_data): + """Test proxy download endpoint returns 502 when download fails""" + environ = {"Authorization": setup_data.activityinfo_user["token"]} + job_id = "job_abc123" + + fake_status = { + "id": job_id, + "state": "completed", + "percentComplete": 100, + "result": { + "downloadUrl": "/resources/jobs/job_abc123/download" + } + } + + with mock.patch( + "ckanext.activityinfo.data.base.ActivityInfoClient.get_job_status", + return_value=fake_status, + ), mock.patch( + "ckanext.activityinfo.data.base.ActivityInfoClient.download_file", + side_effect=Exception("GCS 404 Not Found"), + ): + resp = app.get( + f"/activity-info/download-file/{job_id}", + headers=environ, + expect_errors=True + ) + assert resp.status_code == 502 + @pytest.mark.usefixtures("clean_db") class TestDownloadActions: