Thread-Powered Local Executor #57699
Replies: 9 comments 2 replies
-
|
Just wanted to make sure I’m understanding the issue right, the goal is to create a new executor that uses a thread pool instead of multiple processes to reduce memory usage for lightweight tasks like SSHOperator, right? I’d love to be put on working this issue! |
Beta Was this translation helpful? Give feedback.
-
|
Exactly :) |
Beta Was this translation helpful? Give feedback.
-
|
You can reduce the number of default worker processes (which is 32 by default) using the In my opinion, achieving parallelism through Python threads may not always behave as expected (since this is not typical concept of thread). In that case, the only option left is to create separate processes. |
Beta Was this translation helpful? Give feedback.
-
|
@wjddn279 sure a low parallelism is an option, but the downside is (if I understand this correctly) that one cannot have more than 5 Tasks running simultaneously, right? This can be a serious limitation for us. On the other hand, these long running tasks require very low processing power and RAM (they are just SSH Operators). The hight memory consumption of the spawned processes (I assume and correct me if I am wrong) does not come from the SSH Operator itself but from all the stuff around it: python interpreter, module imports, setting up global state etc. I hope there is a way we can spare having to do this setup 32 times just to have 32 lightweight tasks running in parallel. But as said, I am not a python expert at all. So not sure how well can Python do multithreading |
Beta Was this translation helpful? Give feedback.
-
|
Yes, I strongly agree that the excessive memory usage compared to the simplicity of the tasks stems from Python’s fork-based process creation model. If the memory issue cannot be resolved (I’m currently looking into it), switching to a model where workers are created dynamically—similar to the KubernetesExecutor—might be a good alternative. However, it’s hard to predict what impact that change might have, so I think we should get input from whoever is managing the overall direction. |
Beta Was this translation helpful? Give feedback.
-
|
This approach has multiple disadvantages. not the least of it is that each task is not isolated from each other and there are multiple problems connected with tasks shared in the same interpreter. Generically solving all the issues connected with it is not really feasible. But if you have your case where all your tasks are run with the code that your implementation will make sure to avoid side-effects connected then it might work. But if you notice how many times you is mentioned there, you will see that yes - this is great idea for your own executor to implement. Which is perfectly fine. Airflow Allows you to implement your own executor and use it, and you are perfectly fine to do so. Whether this can be shared with other users, is debatable. It implies quite many limitations on how DAGs should be implemented by users using it and supporting all the edge cases is likely going to be costly. I think a viable way of approach it is definitely first you implement it for your case and test. And battle test it. And figure out limitations that you document. Then - you will see if the limitations are good enough and easy to explain and document, so that average DAG author and DAGs developed by them will run without troubles. THEN the next step is to open a discussion in the devlist - this is where important additions to the community that inevitably lead to maintenance effort from the community side should be discussed. Ideally with experience and findings of you implementing your solution and battle-testing it. For now - I will convert it to a discussion if you need more feedback - but if you get to a stage where thigns are battle-tested and you have enough findings - feel free to start a discussion in devlist |
Beta Was this translation helpful? Give feedback.
-
|
I am not sure about the complete use case but it seems like you are executing commands on remote hosts and waiting for long running commands which occupies the worker. Another option I could think of would be writing your own triggers using an asyncio compatible ssh library like asyncssh which will free up the workers and also could be more efficient with asyncio. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html |
Beta Was this translation helpful? Give feedback.
-
|
As per below thread it's possible to run long commands and wait for output asynchronously. Inside the async for loop the stdout line can be logged which should appear in the UI since trigger logs are now shown in the UI. https://groups.google.com/g/asyncssh-users/c/DiwRpZpYQjM/m/5uklhe01AwAJ?pli=1 |
Beta Was this translation helpful? Give feedback.
-
|
Also. I think you might have an alternative solution - use Celery workers. Celery can be configure to provide you with 4 (and even 5) ways of running the workers: https://docs.celeryq.dev/en/latest/userguide/concurrency/index.html
All you need is to setup a redis broker to distribute tasks in memory and have a celery worker with as many concurrent workers with whatever concurrency you choose. There is little point to implement those concurrency methods in the Local executor if you can get them this way - and it has also the advantage that the celery worker will be even better separated from scheduler and any "side-effect" because of side-effects between running tasks can be solved by just restarting celery |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Description
Implement a new executor type that executes tasks locally, but unlike LocalExecutor, it starts multiple threads instead of spawning multiple processes.
Use case/motivation
Our usecase is pretty straight forward. We use airflow to orchestrate our tasks that will run in different servers. To achieve that we use exclusively SSHOpeators. The actual heavy lifting of the task happens on the target machines (the tech stack of which may vary and is completely independent of airflow)
At the moment we use LocalExecutor with a parallelism of 32.
But unfotunately, the spawned processes of LocalExecutor eat way too much RAM, which causes significant memory issues on our infrastructure.
As mentioned here, each one of the 32 workers will eventually occupy about 120MB, and this memory will stay occupied even when the workers are idle.
I think that in the case of SSHOperatos (and other super-lightweight tasks) the spawning of multiple processes can be an overkill. It would maybe make sense for such usecases to have a very simple executor that will run as a single process and will allocate a thread-pool of N threads instead of spawning N processes.
If you think this is a good idea, will happily do an attempt to implement this myself and submit a PR. But I might need some support since I am not a python expert :)
If you think that this is not a good idea, please suggest an alternative because for us having 5+ Gigabytes being allocated just to open couple of SSH Connections (aprox 120MB per connection) is a real pain.
At the end of the day, my sole motivation is to be able to execute lightweight tasks without having to allocate the massive resources that LocalExecutor's workers consume, even when they are idle.
Thanks :)
Related issues
#56641 (comment)
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions