Skip to content

feat(engine): add parallel execution support for iteration nodes#1131

Merged
hygao1024 merged 1 commit intoiflytek:mainfrom
cumthxy:feature/iterator-parallel-optimization
Apr 3, 2026
Merged

feat(engine): add parallel execution support for iteration nodes#1131
hygao1024 merged 1 commit intoiflytek:mainfrom
cumthxy:feature/iterator-parallel-optimization

Conversation

@cumthxy
Copy link
Copy Markdown
Contributor

@cumthxy cumthxy commented Apr 2, 2026

Summary

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Refactoring

Related Issue

Changes

  • Add parallel execution support for iteration nodes with configurable concurrency, timeout, and error handling strategies
  • Isolate child iteration execution context, including callback queues, variable pools, and history state, to prevent cross-batch interference
  • Improve workflow engine task handling and cleanup to better support iteration subgraph execution and cancellation scenarios
  • Add iteration sub-DSL extraction and chain resolution enhancements for standalone child workflow execution
  • Update ordered streaming callback handling to keep parallel iteration outputs isolated and correctly sequenced

Testing

  • Existing tests pass
  • New tests added (if applicable)
  • Manual testing completed

Screenshots (if applicable)

Checklist

  • Code follows project coding standards
  • Self-review completed
  • Documentation updated (if needed)
  • Breaking changes documented

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel execution capabilities for iteration nodes in the workflow engine. Key changes include the implementation of a parallel execution mode with configurable concurrency and error strategies (fail_fast, continue, ignore_error_output), the addition of isolated child engines for parallel batches, and deep copying of variable pools to prevent state sharing. The engine now also supports ordered event relaying from child iterations. Feedback highlights potential improvements in exception handling within the event relay logic and suggests optimizing task management in the queue consumption loop to avoid unnecessary task recreation.

"""
try:
payload = done_task.result()
except (asyncio.CancelledError, Exception):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a generic Exception here is too broad and might mask unexpected bugs. It is better to catch specific exceptions that are expected from done_task.result(), such as asyncio.CancelledError or specific errors raised by the child engine.

Comment on lines +745 to +747
wait_tasks[asyncio.create_task(child_stream_queue.get())] = "stream"
if not queue_done_state["order"]:
wait_tasks[asyncio.create_task(child_order_queue.get())] = "order"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Creating new tasks in every iteration of the loop for child_stream_queue.get() and child_order_queue.get() is inefficient. If one task completes, the other is cancelled and recreated in the next iteration. Consider reusing the pending tasks to reduce overhead.

@hygao1024 hygao1024 merged commit d42cee7 into iflytek:main Apr 3, 2026
27 of 35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants