Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
.env.production.local
.env.development
.env.test
.env.production
.env.production
*.sln
45 changes: 44 additions & 1 deletion template/netwrix-python/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

SOURCE_TYPE: Final = os.getenv("SOURCE_TYPE", "internal")
FUNCTION_TYPE: Final = os.getenv("FUNCTION_TYPE", "netwrix")
SERVICE_NAME: Final = f"{SOURCE_TYPE}-{FUNCTION_TYPE}"
PROCESS_KEY: Final = os.getenv("POST_PROCESSING_KEY")
SERVICE_NAME: Final = f"{SOURCE_TYPE}-{PROCESS_KEY}" if PROCESS_KEY else f"{SOURCE_TYPE}-{FUNCTION_TYPE}"

# Common functions base URL - defaults to access-analyzer namespace K8s services
# For local development, set to appropriate docker-compose service names
Expand Down Expand Up @@ -340,6 +341,7 @@ def __init__(self):
self.secrets: dict[str, str] | None = None
self.scan_id: str | None = os.getenv("SCAN_ID")
self.scan_execution_id: str | None = None
self.parent_execution_id: str | None = None
self.run_local: str = os.getenv("RUN_LOCAL", "false")
self.function_type: str | None = os.getenv("FUNCTION_TYPE")
self.tables: dict[str, BatchManager] = {}
Expand Down Expand Up @@ -548,6 +550,45 @@ def wrapped_target(*target_args, **target_kwargs):
# Create thread with wrapped target
return threading.Thread(*args, target=wrapped_target, **kwargs)

def run_process_async(self, process_key: str) -> None:
"""
Trigger an additional process asynchronously as a new child of the parent execution.

This allows a handler to kick off other named processes defined in the connector's
config.json `additionalProcesses` array. The new child execution runs under the same
parent, so it participates in the same lifecycle (its completion is tracked and the
parent only completes when all children are done).

Args:
process_key: The key of the additional process to trigger (must match the handler
directory name and the `key` field in `additionalProcesses`).

Raises:
ValueError: If parent_execution_id is not set on the context.
requests.HTTPError: If the core-api call fails.
"""
if not self.parent_execution_id:
raise ValueError("parent_execution_id must be set to call run_process_async")

base_url = os.getenv("CORE_API_INTERNAL_URL", "http://core-api:3000")
url = f"{base_url}/api/v1/scan-executions/{self.parent_execution_id}/run-process"

headers = {"Content-Type": "application/json"}
api_key = os.getenv("CONNECTOR_API_KEY", "")
if api_key:
headers["X-Api-Key"] = api_key

response = requests.post(
url,
json={"processKey": process_key},
headers=headers,
timeout=30,
)
response.raise_for_status()

result = response.json()
self.log.info("Triggered additional process", process_key=process_key, execution_id=result.get("executionId"))

# Add an object to the appropriate table batch manager
def save_object(self, table: str, obj: object, update_status: bool = True):
if table not in self.tables:
Expand Down Expand Up @@ -868,6 +909,7 @@ def call_handler(path: str):

request_data = json.loads(event.body)
context.scan_execution_id = request_data.get("scanExecutionId")
context.parent_execution_id = request_data.get("parentExecutionId") or None

if not context.secrets:
context.log.warning("No secrets loaded from secret files")
Expand Down Expand Up @@ -999,6 +1041,7 @@ def run_as_job():
request_data = {}

ctx.scan_execution_id = request_data.get("scanExecutionId") or os.getenv("SCAN_EXECUTION_ID")
ctx.parent_execution_id = request_data.get("parentExecutionId") or None

ctx.log.info(
"Starting job execution",
Expand Down
Loading