-
Notifications
You must be signed in to change notification settings - Fork 3
Composite State Updates #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,16 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import copy | ||
| import logging | ||
| import typing as t | ||
| from abc import ABC, abstractmethod | ||
| from collections import defaultdict | ||
| from enum import Enum | ||
|
|
||
| from pydantic import BaseModel, Extra | ||
|
|
||
| from .helpers import ( | ||
| JSONObject, | ||
| eliminate_none_values, | ||
| ensure_json_path, | ||
| ensure_parameter_values, | ||
| ) | ||
| from .helpers import JSONObject, ensure_json_path, ensure_parameter_values | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseState(ABC, BaseModel): | ||
|
|
@@ -103,19 +101,6 @@ def get_flow_transition_states(self) -> t.List[str]: | |
| return [] | ||
|
|
||
|
|
||
| class BaseCompositeState(BaseState): | ||
| """ | ||
| A class which allows combining two or more base states | ||
| """ | ||
|
|
||
| state_type: str = "CompositeVirtualState" | ||
| state_name_prefix: str = "" | ||
|
|
||
| @abstractmethod | ||
| def get_flow_definition(self) -> JSONObject: | ||
| return super().get_flow_definition() | ||
|
|
||
|
|
||
| class StateWithNextOrEnd(BaseState): | ||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
|
|
@@ -157,11 +142,13 @@ def next( | |
| new_next_state = old_next | ||
| if self.next_state is None: | ||
| self.next_state = new_next_state | ||
| elif ( | ||
| isinstance(self.next_state, StateWithNextOrEnd) | ||
| and new_next_state is not None | ||
| ): | ||
| self.next_state.next(new_next_state) | ||
| elif new_next_state is not None: | ||
| try: | ||
| self.next_state.next(new_next_state) | ||
| except AttributeError: | ||
| logger.warn( | ||
| f"Unable to set next for state {self.next_state.valid_state_name}" | ||
| ) | ||
| return self | ||
|
|
||
| def get_child_states(self) -> t.List[BaseState]: | ||
|
|
@@ -268,3 +255,49 @@ def result_path_for_step(self) -> str: | |
| ) | ||
| result_path = ensure_json_path(result_path) | ||
| return result_path | ||
|
|
||
|
|
||
| class BaseCompositeState(BaseState): | ||
| state_type: str = "CompositeVirtualState" | ||
| state_name_prefix: str = "" | ||
|
|
||
| @abstractmethod | ||
| def construct_flow(self) -> BaseState: | ||
| raise ValueError( | ||
| f"construct_flow method not implemented on class {type(self).__name__}" | ||
| ) | ||
|
|
||
| def get_flow_definition(self) -> JSONObject: | ||
| start_state = self.construct_flow() | ||
| flow_definition = start_state.get_flow_definition() | ||
| if self.state_name_prefix: | ||
| new_states: JSONObject = {} | ||
| flow_states: JSONObject = flow_definition["States"] | ||
| for state_name, state_def in flow_states.items(): | ||
| new_states[self.state_name_prefix + state_name] = copy.deepcopy( | ||
| state_def | ||
| ) | ||
| for new_state_name, new_state_def in new_states.items(): | ||
| for state_def_key, state_def_val in new_state_def.items(): | ||
| # Replace all references to old state names with prefixed value | ||
| if state_def_val in flow_states.keys(): | ||
| new_states[new_state_name][state_def_key] = ( | ||
| self.state_name_prefix + state_def_val | ||
| ) | ||
| flow_definition["States"] = new_states | ||
|
|
||
| return flow_definition | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The state renaming code might be nice to have in its own separate method for clarity if people are running |
||
|
|
||
| @abstractmethod | ||
| def next( | ||
| self, | ||
| next_state: BaseState, | ||
| for_state: t.Optional[t.Union[str, BaseState]] = None, | ||
| ) -> BaseState: | ||
| """Set the next state for the composite state/sub-flow. The must be implemented | ||
| by each composite state. if for_state is None (the default) the next_state | ||
| should be set on all out-going links of the composite state. If for_state is | ||
| provided, the next should only be set on it for the state spcified by state_name | ||
| or by instance. | ||
| """ | ||
| ... | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If construct_flow is called here, does it need its own method?