From 859a467e5be3709536dd16469ca17122d4e57878 Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Tue, 10 Mar 2026 15:49:27 -0400 Subject: [PATCH 1/6] feat(context): add data_query() method to Context class Adds Context.data_query(table, query) to the netwrix-python template, allowing post-processing handlers to query ClickHouse via the data-query service without constructing raw HTTP calls manually. Generated with AI Co-Authored-By: Claude Code --- .gitignore | 3 ++- template/netwrix-python/index.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ac9e0f1..bf5b4cc 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ .env.production.local .env.development .env.test -.env.production \ No newline at end of file +.env.production +*.sln \ No newline at end of file diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index b2d75f8..0c56d43 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -548,6 +548,35 @@ def wrapped_target(*target_args, **target_kwargs): # Create thread with wrapped target return threading.Thread(*args, target=wrapped_target, **kwargs) + def data_query(self, table: str, query: str) -> list[dict]: + """ + Execute a SQL query against the data-query service and return the results. + + Args: + table: The ClickHouse table name (used for logging context). + query: The SQL query string to execute. + + Returns: + list[dict]: List of result rows as dictionaries. + + Raises: + RuntimeError: If the query fails or the service returns an error. + """ + headers = {"Content-Type": "application/json", **self.get_caller_headers()} + service_name = os.getenv("DATA_QUERY_FUNCTION", "data-query") + url = get_service_url(service_name) + + self.log.info(f"Querying data-query service", table=table) + + response = requests.post(url, json={"query": query.strip()}, headers=headers, timeout=60) + response.raise_for_status() + + result = response.json() + if not result.get("success"): + raise RuntimeError(f"data-query failed for table '{table}': {result.get('error', 'Unknown error')}") + + return result.get("data", []) + # Add an object to the appropriate table batch manager def save_object(self, table: str, obj: object, update_status: bool = True): if table not in self.tables: From 92107c8ef5aa83905f9fd288bf48290d58929a14 Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Tue, 10 Mar 2026 16:20:01 -0400 Subject: [PATCH 2/6] fix(context): remove table arg from data_query() The data-query service only takes a query, so the table parameter served no purpose. Generated with AI Co-Authored-By: Claude Code --- template/netwrix-python/index.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index 0c56d43..91a7882 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -548,12 +548,11 @@ def wrapped_target(*target_args, **target_kwargs): # Create thread with wrapped target return threading.Thread(*args, target=wrapped_target, **kwargs) - def data_query(self, table: str, query: str) -> list[dict]: + def data_query(self, query: str) -> list[dict]: """ Execute a SQL query against the data-query service and return the results. Args: - table: The ClickHouse table name (used for logging context). query: The SQL query string to execute. Returns: @@ -566,14 +565,12 @@ def data_query(self, table: str, query: str) -> list[dict]: service_name = os.getenv("DATA_QUERY_FUNCTION", "data-query") url = get_service_url(service_name) - self.log.info(f"Querying data-query service", table=table) - response = requests.post(url, json={"query": query.strip()}, headers=headers, timeout=60) response.raise_for_status() result = response.json() if not result.get("success"): - raise RuntimeError(f"data-query failed for table '{table}': {result.get('error', 'Unknown error')}") + raise RuntimeError(f"data-query failed: {result.get('error', 'Unknown error')}") return result.get("data", []) From ae2f0eb321e642d41c94d15534ab509746b2c01b Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Fri, 13 Mar 2026 10:23:41 -0400 Subject: [PATCH 3/6] feat(context): run_process_async and SERVICE_NAME fix for additional processes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add run_process_async(process_key) to Context — calls POST /api/v1/scan-executions/{parent_execution_id}/run-process on core-api to trigger a named additional process mid-execution - Fix SERVICE_NAME: use POST_PROCESSING_KEY when set so additional process pods report as e.g. "cifs-mid-process" instead of "cifs-additional_processing" Generated with AI Co-Authored-By: Claude Code --- template/netwrix-python/index.py | 40 +++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index 91a7882..ebc23ba 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -47,7 +47,8 @@ SOURCE_TYPE: Final = os.getenv("SOURCE_TYPE", "internal") FUNCTION_TYPE: Final = os.getenv("FUNCTION_TYPE", "netwrix") -SERVICE_NAME: Final = f"{SOURCE_TYPE}-{FUNCTION_TYPE}" +_process_key: Final = os.getenv("POST_PROCESSING_KEY") +SERVICE_NAME: Final = f"{SOURCE_TYPE}-{_process_key}" if _process_key else f"{SOURCE_TYPE}-{FUNCTION_TYPE}" # Common functions base URL - defaults to access-analyzer namespace K8s services # For local development, set to appropriate docker-compose service names @@ -340,6 +341,7 @@ def __init__(self): self.secrets: dict[str, str] | None = None self.scan_id: str | None = os.getenv("SCAN_ID") self.scan_execution_id: str | None = None + self.parent_execution_id: str | None = None self.run_local: str = os.getenv("RUN_LOCAL", "false") self.function_type: str | None = os.getenv("FUNCTION_TYPE") self.tables: dict[str, BatchManager] = {} @@ -574,6 +576,40 @@ def data_query(self, query: str) -> list[dict]: return result.get("data", []) + def run_process_async(self, process_key: str) -> None: + """ + Trigger an additional process asynchronously as a new child of the parent execution. + + This allows a handler to kick off other named processes defined in the connector's + config.json `additionalProcesses` array. The new child execution runs under the same + parent, so it participates in the same lifecycle (its completion is tracked and the + parent only completes when all children are done). + + Args: + process_key: The key of the additional process to trigger (must match the handler + directory name and the `key` field in `additionalProcesses`). + + Raises: + ValueError: If parent_execution_id is not set on the context. + requests.HTTPError: If the core-api call fails. + """ + if not self.parent_execution_id: + raise ValueError("parent_execution_id must be set to call run_process_async") + + base_url = os.getenv("CORE_API_INTERNAL_URL", "http://core-api:3000") + url = f"{base_url}/api/v1/scan-executions/{self.parent_execution_id}/run-process" + + response = requests.post( + url, + json={"processKey": process_key}, + headers={"Content-Type": "application/json"}, + timeout=30, + ) + response.raise_for_status() + + result = response.json() + self.log.info("Triggered additional process", process_key=process_key, execution_id=result.get("executionId")) + # Add an object to the appropriate table batch manager def save_object(self, table: str, obj: object, update_status: bool = True): if table not in self.tables: @@ -894,6 +930,7 @@ def call_handler(path: str): request_data = json.loads(event.body) context.scan_execution_id = request_data.get("scanExecutionId") + context.parent_execution_id = request_data.get("parentExecutionId") or None if not context.secrets: context.log.warning("No secrets loaded from secret files") @@ -1025,6 +1062,7 @@ def run_as_job(): request_data = {} ctx.scan_execution_id = request_data.get("scanExecutionId") or os.getenv("SCAN_EXECUTION_ID") + ctx.parent_execution_id = request_data.get("parentExecutionId") or None ctx.log.info( "Starting job execution", From 1b3cc68d80f1522b7823acb30aa84c921f09f5ad Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Fri, 13 Mar 2026 13:08:21 -0400 Subject: [PATCH 4/6] Removed data_query function since it was only used for testing purposes --- template/netwrix-python/index.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index ebc23ba..78811a4 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -550,32 +550,6 @@ def wrapped_target(*target_args, **target_kwargs): # Create thread with wrapped target return threading.Thread(*args, target=wrapped_target, **kwargs) - def data_query(self, query: str) -> list[dict]: - """ - Execute a SQL query against the data-query service and return the results. - - Args: - query: The SQL query string to execute. - - Returns: - list[dict]: List of result rows as dictionaries. - - Raises: - RuntimeError: If the query fails or the service returns an error. - """ - headers = {"Content-Type": "application/json", **self.get_caller_headers()} - service_name = os.getenv("DATA_QUERY_FUNCTION", "data-query") - url = get_service_url(service_name) - - response = requests.post(url, json={"query": query.strip()}, headers=headers, timeout=60) - response.raise_for_status() - - result = response.json() - if not result.get("success"): - raise RuntimeError(f"data-query failed: {result.get('error', 'Unknown error')}") - - return result.get("data", []) - def run_process_async(self, process_key: str) -> None: """ Trigger an additional process asynchronously as a new child of the parent execution. From b422213ec794043bf1c83a22d22fa3ea8cd17860 Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Fri, 13 Mar 2026 15:22:38 -0400 Subject: [PATCH 5/6] feat(context): send X-Api-Key header in run_process_async Pass CONNECTOR_API_KEY as X-Api-Key header when calling core-api's run-process endpoint, matching the new internal webhook auth requirement. Generated with AI Co-Authored-By: Claude Code --- template/netwrix-python/index.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index 78811a4..bd0dbaf 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -573,10 +573,15 @@ def run_process_async(self, process_key: str) -> None: base_url = os.getenv("CORE_API_INTERNAL_URL", "http://core-api:3000") url = f"{base_url}/api/v1/scan-executions/{self.parent_execution_id}/run-process" + headers = {"Content-Type": "application/json"} + api_key = os.getenv("CONNECTOR_API_KEY", "") + if api_key: + headers["X-Api-Key"] = api_key + response = requests.post( url, json={"processKey": process_key}, - headers={"Content-Type": "application/json"}, + headers=headers, timeout=30, ) response.raise_for_status() From 1dbaa48e7d26c4d362fb3734cf7ceee9a39ae167 Mon Sep 17 00:00:00 2001 From: mckibbenc Date: Mon, 16 Mar 2026 09:59:13 -0400 Subject: [PATCH 6/6] Converted env variable to UPPER_SNAKE_CASE --- template/netwrix-python/index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index bd0dbaf..3a58585 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -47,8 +47,8 @@ SOURCE_TYPE: Final = os.getenv("SOURCE_TYPE", "internal") FUNCTION_TYPE: Final = os.getenv("FUNCTION_TYPE", "netwrix") -_process_key: Final = os.getenv("POST_PROCESSING_KEY") -SERVICE_NAME: Final = f"{SOURCE_TYPE}-{_process_key}" if _process_key else f"{SOURCE_TYPE}-{FUNCTION_TYPE}" +PROCESS_KEY: Final = os.getenv("POST_PROCESSING_KEY") +SERVICE_NAME: Final = f"{SOURCE_TYPE}-{PROCESS_KEY}" if PROCESS_KEY else f"{SOURCE_TYPE}-{FUNCTION_TYPE}" # Common functions base URL - defaults to access-analyzer namespace K8s services # For local development, set to appropriate docker-compose service names