diff --git a/.gitignore b/.gitignore index ac9e0f1..bf5b4cc 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ .env.production.local .env.development .env.test -.env.production \ No newline at end of file +.env.production +*.sln \ No newline at end of file diff --git a/template/netwrix-python/index.py b/template/netwrix-python/index.py index b2d75f8..3a58585 100644 --- a/template/netwrix-python/index.py +++ b/template/netwrix-python/index.py @@ -47,7 +47,8 @@ SOURCE_TYPE: Final = os.getenv("SOURCE_TYPE", "internal") FUNCTION_TYPE: Final = os.getenv("FUNCTION_TYPE", "netwrix") -SERVICE_NAME: Final = f"{SOURCE_TYPE}-{FUNCTION_TYPE}" +PROCESS_KEY: Final = os.getenv("POST_PROCESSING_KEY") +SERVICE_NAME: Final = f"{SOURCE_TYPE}-{PROCESS_KEY}" if PROCESS_KEY else f"{SOURCE_TYPE}-{FUNCTION_TYPE}" # Common functions base URL - defaults to access-analyzer namespace K8s services # For local development, set to appropriate docker-compose service names @@ -340,6 +341,7 @@ def __init__(self): self.secrets: dict[str, str] | None = None self.scan_id: str | None = os.getenv("SCAN_ID") self.scan_execution_id: str | None = None + self.parent_execution_id: str | None = None self.run_local: str = os.getenv("RUN_LOCAL", "false") self.function_type: str | None = os.getenv("FUNCTION_TYPE") self.tables: dict[str, BatchManager] = {} @@ -548,6 +550,45 @@ def wrapped_target(*target_args, **target_kwargs): # Create thread with wrapped target return threading.Thread(*args, target=wrapped_target, **kwargs) + def run_process_async(self, process_key: str) -> None: + """ + Trigger an additional process asynchronously as a new child of the parent execution. + + This allows a handler to kick off other named processes defined in the connector's + config.json `additionalProcesses` array. The new child execution runs under the same + parent, so it participates in the same lifecycle (its completion is tracked and the + parent only completes when all children are done). + + Args: + process_key: The key of the additional process to trigger (must match the handler + directory name and the `key` field in `additionalProcesses`). + + Raises: + ValueError: If parent_execution_id is not set on the context. + requests.HTTPError: If the core-api call fails. + """ + if not self.parent_execution_id: + raise ValueError("parent_execution_id must be set to call run_process_async") + + base_url = os.getenv("CORE_API_INTERNAL_URL", "http://core-api:3000") + url = f"{base_url}/api/v1/scan-executions/{self.parent_execution_id}/run-process" + + headers = {"Content-Type": "application/json"} + api_key = os.getenv("CONNECTOR_API_KEY", "") + if api_key: + headers["X-Api-Key"] = api_key + + response = requests.post( + url, + json={"processKey": process_key}, + headers=headers, + timeout=30, + ) + response.raise_for_status() + + result = response.json() + self.log.info("Triggered additional process", process_key=process_key, execution_id=result.get("executionId")) + # Add an object to the appropriate table batch manager def save_object(self, table: str, obj: object, update_status: bool = True): if table not in self.tables: @@ -868,6 +909,7 @@ def call_handler(path: str): request_data = json.loads(event.body) context.scan_execution_id = request_data.get("scanExecutionId") + context.parent_execution_id = request_data.get("parentExecutionId") or None if not context.secrets: context.log.warning("No secrets loaded from secret files") @@ -999,6 +1041,7 @@ def run_as_job(): request_data = {} ctx.scan_execution_id = request_data.get("scanExecutionId") or os.getenv("SCAN_EXECUTION_ID") + ctx.parent_execution_id = request_data.get("parentExecutionId") or None ctx.log.info( "Starting job execution",