From 7e5d5aca06f038881c25fb84205b0b11504b3b9e Mon Sep 17 00:00:00 2001 From: Galushko Bogdan Date: Sun, 26 Jan 2025 19:45:04 +0300 Subject: [PATCH 1/2] using actors from different files --- CONTRIBUTORS.md | 1 + .../use_actors_from_different_files/README.md | 160 ++++++++++++++++++ .../core/__init__.py | 0 .../core/dramatiq_service.py | 12 ++ .../use_actors_from_different_files/main.py | 14 ++ .../tasks/__init__.py | 0 .../tasks/bar/__init__.py | 1 + .../tasks/bar/tasks.py | 6 + .../tasks/foo/__init__.py | 1 + .../tasks/foo/tasks.py | 6 + 10 files changed, 201 insertions(+) create mode 100644 examples/use_actors_from_different_files/README.md create mode 100644 examples/use_actors_from_different_files/core/__init__.py create mode 100644 examples/use_actors_from_different_files/core/dramatiq_service.py create mode 100644 examples/use_actors_from_different_files/main.py create mode 100644 examples/use_actors_from_different_files/tasks/__init__.py create mode 100644 examples/use_actors_from_different_files/tasks/bar/__init__.py create mode 100644 examples/use_actors_from_different_files/tasks/bar/tasks.py create mode 100644 examples/use_actors_from_different_files/tasks/foo/__init__.py create mode 100644 examples/use_actors_from_different_files/tasks/foo/tasks.py diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 62533cbe..cae82b1a 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -66,3 +66,4 @@ of those changes to CLEARTYPE SRL. | [@5tefan](https://github.com/5tefan/) | Stefan Codrescu | | [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk | | [@dbowring](https://github.com/dbowring/) | Daniel Bowring | +| [@bagowix](https://github.com/bagowix/) | Bogdan Galushko | diff --git a/examples/use_actors_from_different_files/README.md b/examples/use_actors_from_different_files/README.md new file mode 100644 index 00000000..cbe41216 --- /dev/null +++ b/examples/use_actors_from_different_files/README.md @@ -0,0 +1,160 @@ +# Using Actors from Different Files + +## Overview + +If you want to use actors from different files in your project, you must ensure that all actors are properly initialized and visible to Dramatiq. This example demonstrates how to set up a project with multiple actor files and run them through the Dramatiq CLI. + +## Project Structure + +``` +project/ +├── core/ +│ ├── dramatiq_service.py +├── tasks/ +│ ├── foo/ +│ │ ├── __init__.py +│ │ ├── tasks.py +│ ├── bar/ +│ │ ├── __init__.py +│ │ ├── tasks.py +│ ├── __init__.py +├── main.py +``` + +- `tasks/`: Contains actor definitions (`foo/tasks.py`, `bar/tasks.py`). +- `dramatiq_service.py`: Initializes the broker and imports all actors. +- `main.py`: FastAPI application. + +## Setup + +1. **Broker Initialization (`dramatiq_service.py`)** + +```python +import dramatiq +from dramatiq.brokers.rabbitmq import RabbitmqBroker +from dramatiq.middleware import AsyncIO + +# Initialize RabbitMQ broker +rabbitmq_broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672") +rabbitmq_broker.add_middleware(AsyncIO()) +dramatiq.set_broker(rabbitmq_broker) + +# Import all actors +from tasks.bar import * # Ensure all actors are imported +from tasks.foo import * +``` + +2. **Actor Definitions** + +`tasks/bar/tasks.py`: + +```python +import dramatiq + +@dramatiq.actor +def bar_task(): + print("Bar task done.") +``` + +`tasks/foo/tasks.py`: + +```python +import dramatiq + +@dramatiq.actor +def foo_task(): + print("Foo task done.") +``` + +3. **FastAPI Endpoint (`main.py`)** + +```python +from fastapi import FastAPI +from tasks.bar.tasks import bar_task +from tasks.foo.tasks import foo_task + +app = FastAPI() + +@app.get("/test-task") +def test_task(): + bar_task.send() + foo_task.send() + return {"message": "Tasks sent"} +``` + +## Steps to Run + +1. **Start the FastAPI application**: + +``` +uvicorn main:app --host 0.0.0.0 --port 8001 --reload +``` + +2. **Run the Dramatiq worker**: + +``` +dramatiq core.dramatiq_service +``` + +3. **Trigger tasks by making a GET request**: + +``` +curl http://localhost:8001/test-task +``` + +4. **Check the logs in the Dramatiq terminal**: + +``` +Bar task done. +Foo task done. +``` + +## Optional Enhancements + +### Use Automatic Task Registration + +Instead of manually importing actors in `dramatiq_service.py`: + +```python +from tasks.bar import * # Manual import +from tasks.foo import * # Manual import +``` + +You can replace it with an **automatic task registration** method, which dynamically imports all modules with tasks under the `tasks` package. + +**How to Implement**: + +Add the following function to `dramatiq_service.py`: + +```python +import pkgutil +import importlib + +def auto_register_tasks(base_package: str): + """ + Automatically imports all modules within the specified base package. + + Args: + base_package (str): The root package containing task modules. + """ + for module_info in pkgutil.iter_modules([base_package.replace(".", "/")]): + importlib.import_module(f"{base_package}.{module_info.name}") +``` + +Replace the manual imports with a call to `auto_register_tasks`: + +```python +# Manual imports +# from tasks.bar import * +# from tasks.foo import * + +# Use automatic task registration +auto_register_tasks("tasks") +``` + +### Important Note +It is crucial to call `auto_register_tasks` in the exact same location where the manual imports were previously defined. This ensures that all tasks are registered before the Dramatiq worker starts. Failing to do this will result in tasks not being visible to the Dramatiq CLI. + +## Conclusion + +This setup ensures that actors from `multiple files` are visible to `Dramatiq` while maintaining scalability and organization. diff --git a/examples/use_actors_from_different_files/core/__init__.py b/examples/use_actors_from_different_files/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/use_actors_from_different_files/core/dramatiq_service.py b/examples/use_actors_from_different_files/core/dramatiq_service.py new file mode 100644 index 00000000..19710001 --- /dev/null +++ b/examples/use_actors_from_different_files/core/dramatiq_service.py @@ -0,0 +1,12 @@ +import dramatiq +from dramatiq.brokers.rabbitmq import RabbitmqBroker +from dramatiq.middleware import AsyncIO + +rabbitmq_broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672") + +rabbitmq_broker.add_middleware(AsyncIO()) + +dramatiq.set_broker(rabbitmq_broker) + +from tasks.bar import * # !!! IT'S IMPORTANT !!! +from tasks.foo import * # !!! IT'S IMPORTANT !!! diff --git a/examples/use_actors_from_different_files/main.py b/examples/use_actors_from_different_files/main.py new file mode 100644 index 00000000..b06f5ae1 --- /dev/null +++ b/examples/use_actors_from_different_files/main.py @@ -0,0 +1,14 @@ +from fastapi import FastAPI + +from tasks.bar.tasks import bar_task +from tasks.foo.tasks import foo_task + + +app = FastAPI() + + +@app.get("/test-task") +async def test_task(): + foo_task.send() + bar_task.send() + return {"message": "Tasks sent."} diff --git a/examples/use_actors_from_different_files/tasks/__init__.py b/examples/use_actors_from_different_files/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/use_actors_from_different_files/tasks/bar/__init__.py b/examples/use_actors_from_different_files/tasks/bar/__init__.py new file mode 100644 index 00000000..cb5b6762 --- /dev/null +++ b/examples/use_actors_from_different_files/tasks/bar/__init__.py @@ -0,0 +1 @@ +from tasks.bar.tasks import * diff --git a/examples/use_actors_from_different_files/tasks/bar/tasks.py b/examples/use_actors_from_different_files/tasks/bar/tasks.py new file mode 100644 index 00000000..782f6da8 --- /dev/null +++ b/examples/use_actors_from_different_files/tasks/bar/tasks.py @@ -0,0 +1,6 @@ +import dramatiq + + +@dramatiq.actor +async def bar_task(): + print("Bar task done.") diff --git a/examples/use_actors_from_different_files/tasks/foo/__init__.py b/examples/use_actors_from_different_files/tasks/foo/__init__.py new file mode 100644 index 00000000..50d7bd0b --- /dev/null +++ b/examples/use_actors_from_different_files/tasks/foo/__init__.py @@ -0,0 +1 @@ +from tasks.foo.tasks import * diff --git a/examples/use_actors_from_different_files/tasks/foo/tasks.py b/examples/use_actors_from_different_files/tasks/foo/tasks.py new file mode 100644 index 00000000..2655619f --- /dev/null +++ b/examples/use_actors_from_different_files/tasks/foo/tasks.py @@ -0,0 +1,6 @@ +import dramatiq + + +@dramatiq.actor +async def foo_task(): + print("Foo task done.") \ No newline at end of file From 5aa04f0383f6089667f5c1f9ee568adb670c0036 Mon Sep 17 00:00:00 2001 From: Lincoln Puzey <18750802+LincolnPuzey@users.noreply.github.com> Date: Sun, 15 Mar 2026 14:42:11 +0800 Subject: [PATCH 2/2] Update multiple file example to simplify and remove fastAPI references --- .../use_actors_from_different_files/README.md | 168 +++++------------- .../actors/__init__.py | 7 + .../actors/bar.py | 9 + .../actors/foo.py | 9 + .../core/__init__.py | 0 .../core/dramatiq_service.py | 12 -- .../use_actors_from_different_files/main.py | 19 +- .../my_broker.py | 12 ++ .../tasks/__init__.py | 0 .../tasks/bar/__init__.py | 1 - .../tasks/bar/tasks.py | 6 - .../tasks/foo/__init__.py | 1 - .../tasks/foo/tasks.py | 6 - 13 files changed, 87 insertions(+), 163 deletions(-) create mode 100644 examples/use_actors_from_different_files/actors/__init__.py create mode 100644 examples/use_actors_from_different_files/actors/bar.py create mode 100644 examples/use_actors_from_different_files/actors/foo.py delete mode 100644 examples/use_actors_from_different_files/core/__init__.py delete mode 100644 examples/use_actors_from_different_files/core/dramatiq_service.py create mode 100644 examples/use_actors_from_different_files/my_broker.py delete mode 100644 examples/use_actors_from_different_files/tasks/__init__.py delete mode 100644 examples/use_actors_from_different_files/tasks/bar/__init__.py delete mode 100644 examples/use_actors_from_different_files/tasks/bar/tasks.py delete mode 100644 examples/use_actors_from_different_files/tasks/foo/__init__.py delete mode 100644 examples/use_actors_from_different_files/tasks/foo/tasks.py diff --git a/examples/use_actors_from_different_files/README.md b/examples/use_actors_from_different_files/README.md index cbe41216..34666618 100644 --- a/examples/use_actors_from_different_files/README.md +++ b/examples/use_actors_from_different_files/README.md @@ -2,159 +2,71 @@ ## Overview -If you want to use actors from different files in your project, you must ensure that all actors are properly initialized and visible to Dramatiq. This example demonstrates how to set up a project with multiple actor files and run them through the Dramatiq CLI. +If you want to use actors from different files in your project, you must ensure that all +actors are properly initialized and visible to Dramatiq. This example demonstrates one +possible way to set up a project with multiple actor files and run them through the +Dramatiq CLI. ## Project Structure -``` +```text project/ -├── core/ -│ ├── dramatiq_service.py -├── tasks/ -│ ├── foo/ -│ │ ├── __init__.py -│ │ ├── tasks.py -│ ├── bar/ -│ │ ├── __init__.py -│ │ ├── tasks.py +├── actors/ │ ├── __init__.py +│ ├── bar.py +│ ├── foo.py ├── main.py +├── my_broker.py ``` -- `tasks/`: Contains actor definitions (`foo/tasks.py`, `bar/tasks.py`). -- `dramatiq_service.py`: Initializes the broker and imports all actors. -- `main.py`: FastAPI application. - -## Setup - -1. **Broker Initialization (`dramatiq_service.py`)** +- `actors/`: Contains actor definitions (`foo.py`, `bar.py`). +- `my_broker.py`: Initializes the broker. +- `main.py`: Application that publishes messages. -```python -import dramatiq -from dramatiq.brokers.rabbitmq import RabbitmqBroker -from dramatiq.middleware import AsyncIO - -# Initialize RabbitMQ broker -rabbitmq_broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672") -rabbitmq_broker.add_middleware(AsyncIO()) -dramatiq.set_broker(rabbitmq_broker) - -# Import all actors -from tasks.bar import * # Ensure all actors are imported -from tasks.foo import * -``` - -2. **Actor Definitions** - -`tasks/bar/tasks.py`: - -```python -import dramatiq - -@dramatiq.actor -def bar_task(): - print("Bar task done.") -``` +## Steps to Run -`tasks/foo/tasks.py`: +1. **Run the Dramatiq worker**: -```python -import dramatiq +In a terminal, start a worker with the following command -@dramatiq.actor -def foo_task(): - print("Foo task done.") +```shell +dramatiq my_broker actors ``` -3. **FastAPI Endpoint (`main.py`)** +It is important to note: +* The first argument must be the module that initializes the Broker. + Because we are using a plain module name, it must call `set_broker()`. +* The subsequent arguments are extra modules to import that contain actor definitions. + In this case, only our `actors` module. -```python -from fastapi import FastAPI -from tasks.bar.tasks import bar_task -from tasks.foo.tasks import foo_task +2. **Publish tasks by running the application**: -app = FastAPI() +In another terminal, run the application that publishes some messages. +This could be a webserver, or a cron-like scheduler. +In this example it is a simple script. -@app.get("/test-task") -def test_task(): - bar_task.send() - foo_task.send() - return {"message": "Tasks sent"} +```shell +python main.py ``` -## Steps to Run - -1. **Start the FastAPI application**: - -``` -uvicorn main:app --host 0.0.0.0 --port 8001 --reload -``` +3. **Check the logs in the Dramatiq worker terminal**: -2. **Run the Dramatiq worker**: +Back in the first terminal, check the worker logs to see that the actors have run. -``` -dramatiq core.dramatiq_service -``` - -3. **Trigger tasks by making a GET request**: - -``` -curl http://localhost:8001/test-task -``` - -4. **Check the logs in the Dramatiq terminal**: - -``` +```text Bar task done. Foo task done. ``` -## Optional Enhancements - -### Use Automatic Task Registration - -Instead of manually importing actors in `dramatiq_service.py`: - -```python -from tasks.bar import * # Manual import -from tasks.foo import * # Manual import -``` - -You can replace it with an **automatic task registration** method, which dynamically imports all modules with tasks under the `tasks` package. - -**How to Implement**: - -Add the following function to `dramatiq_service.py`: - -```python -import pkgutil -import importlib - -def auto_register_tasks(base_package: str): - """ - Automatically imports all modules within the specified base package. - - Args: - base_package (str): The root package containing task modules. - """ - for module_info in pkgutil.iter_modules([base_package.replace(".", "/")]): - importlib.import_module(f"{base_package}.{module_info.name}") -``` - -Replace the manual imports with a call to `auto_register_tasks`: - -```python -# Manual imports -# from tasks.bar import * -# from tasks.foo import * - -# Use automatic task registration -auto_register_tasks("tasks") -``` +## Conclusion -### Important Note -It is crucial to call `auto_register_tasks` in the exact same location where the manual imports were previously defined. This ensures that all tasks are registered before the Dramatiq worker starts. Failing to do this will result in tasks not being visible to the Dramatiq CLI. +This example is just one way of organizing actors into multiple files, but not the only way. -## Conclusion +The two key rules to remember are: -This setup ensures that actors from `multiple files` are visible to `Dramatiq` while maintaining scalability and organization. +1. Call `set_broker()` before importing any module that uses `@actor`. + This ensures that `@actor` will use the correct Broker. + This can be enforced by importing your broker-defining module in any actor-defining module, + like in this example. +2. Modules that are passed to the worker command line must import (directly or indirectly) + all actors. This ensures that the dramatiq worker is aware of all actors. diff --git a/examples/use_actors_from_different_files/actors/__init__.py b/examples/use_actors_from_different_files/actors/__init__.py new file mode 100644 index 00000000..24ad3e12 --- /dev/null +++ b/examples/use_actors_from_different_files/actors/__init__.py @@ -0,0 +1,7 @@ +"""Module to define actors""" + +# Import actors from submodules to ensure they are registered when this `actors` module is imported. +from .bar import bar_task +from .foo import foo_task + +__all__ = ["bar_task", "foo_task"] diff --git a/examples/use_actors_from_different_files/actors/bar.py b/examples/use_actors_from_different_files/actors/bar.py new file mode 100644 index 00000000..afa52756 --- /dev/null +++ b/examples/use_actors_from_different_files/actors/bar.py @@ -0,0 +1,9 @@ +import dramatiq + +# Import my_broker module to ensure set_broker() is called before @actor is used. +import my_broker + + +@dramatiq.actor(broker=my_broker.rabbitmq_broker) +def bar_task(): + print("Bar task done.") diff --git a/examples/use_actors_from_different_files/actors/foo.py b/examples/use_actors_from_different_files/actors/foo.py new file mode 100644 index 00000000..cf449868 --- /dev/null +++ b/examples/use_actors_from_different_files/actors/foo.py @@ -0,0 +1,9 @@ +import dramatiq + +# Import my_broker module to ensure set_broker() is called before @actor is used. +import my_broker + + +@dramatiq.actor(broker=my_broker.rabbitmq_broker) +def foo_task(): + print("Foo task done.") diff --git a/examples/use_actors_from_different_files/core/__init__.py b/examples/use_actors_from_different_files/core/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/use_actors_from_different_files/core/dramatiq_service.py b/examples/use_actors_from_different_files/core/dramatiq_service.py deleted file mode 100644 index 19710001..00000000 --- a/examples/use_actors_from_different_files/core/dramatiq_service.py +++ /dev/null @@ -1,12 +0,0 @@ -import dramatiq -from dramatiq.brokers.rabbitmq import RabbitmqBroker -from dramatiq.middleware import AsyncIO - -rabbitmq_broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672") - -rabbitmq_broker.add_middleware(AsyncIO()) - -dramatiq.set_broker(rabbitmq_broker) - -from tasks.bar import * # !!! IT'S IMPORTANT !!! -from tasks.foo import * # !!! IT'S IMPORTANT !!! diff --git a/examples/use_actors_from_different_files/main.py b/examples/use_actors_from_different_files/main.py index b06f5ae1..1c3eb495 100644 --- a/examples/use_actors_from_different_files/main.py +++ b/examples/use_actors_from_different_files/main.py @@ -1,14 +1,15 @@ -from fastapi import FastAPI +# Import Broker first so it is set up. +import my_broker # noqa: F401 -from tasks.bar.tasks import bar_task -from tasks.foo.tasks import foo_task +# Import actors after broker. +import actors -app = FastAPI() +def my_app(): + """Basic application that publishes some messages.""" + actors.bar_task.send() + actors.foo_task.send() -@app.get("/test-task") -async def test_task(): - foo_task.send() - bar_task.send() - return {"message": "Tasks sent."} +if __name__ == "__main__": + my_app() diff --git a/examples/use_actors_from_different_files/my_broker.py b/examples/use_actors_from_different_files/my_broker.py new file mode 100644 index 00000000..a40569e0 --- /dev/null +++ b/examples/use_actors_from_different_files/my_broker.py @@ -0,0 +1,12 @@ +import dramatiq +from dramatiq.brokers.rabbitmq import RabbitmqBroker +from dramatiq.middleware import CurrentMessage + +# Set up Broker (and any custom Middleware) +rabbitmq_broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672") +rabbitmq_broker.add_middleware(CurrentMessage()) + +# Declare your Broker. +# IMPORTANT: This must run before importing any module that uses the @actor decorator. +# This is because @actor will immediately look up the Broker to associate the Actor with it. +dramatiq.set_broker(rabbitmq_broker) diff --git a/examples/use_actors_from_different_files/tasks/__init__.py b/examples/use_actors_from_different_files/tasks/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/use_actors_from_different_files/tasks/bar/__init__.py b/examples/use_actors_from_different_files/tasks/bar/__init__.py deleted file mode 100644 index cb5b6762..00000000 --- a/examples/use_actors_from_different_files/tasks/bar/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from tasks.bar.tasks import * diff --git a/examples/use_actors_from_different_files/tasks/bar/tasks.py b/examples/use_actors_from_different_files/tasks/bar/tasks.py deleted file mode 100644 index 782f6da8..00000000 --- a/examples/use_actors_from_different_files/tasks/bar/tasks.py +++ /dev/null @@ -1,6 +0,0 @@ -import dramatiq - - -@dramatiq.actor -async def bar_task(): - print("Bar task done.") diff --git a/examples/use_actors_from_different_files/tasks/foo/__init__.py b/examples/use_actors_from_different_files/tasks/foo/__init__.py deleted file mode 100644 index 50d7bd0b..00000000 --- a/examples/use_actors_from_different_files/tasks/foo/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from tasks.foo.tasks import * diff --git a/examples/use_actors_from_different_files/tasks/foo/tasks.py b/examples/use_actors_from_different_files/tasks/foo/tasks.py deleted file mode 100644 index 2655619f..00000000 --- a/examples/use_actors_from_different_files/tasks/foo/tasks.py +++ /dev/null @@ -1,6 +0,0 @@ -import dramatiq - - -@dramatiq.actor -async def foo_task(): - print("Foo task done.") \ No newline at end of file