-
|
Hello everyone! Could you help me to understand how to work with FastStream? Previously, my project only used Taskiq for scheduled and background tasks. I expect the FastAPI application to act only as an event publisher. Messages will be processed by workers in a separate process, but I don't understand how to set this up. The FastStream CLI starts correctly but does not process any tasks; these are processed inside FastAPI. Should I be doing things differently? I am confused by the fact that tasks are being executed inside the FastAPI application. Also, with the transition to FastStream_Taskiq, scheduled tasks started being executed directly by the scheduler rather than by the Taskiq worker. This confuses me as my scheduled tasks are quite heavy and there is only one scheduler. broker init code from contextlib import asynccontextmanager
from dishka import make_async_container
from dishka.integrations.faststream import setup_dishka
from fastapi import FastAPI
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
from app.shared_kernel.config_data import settings
from app.shared_kernel.dependencies.gateways import ProdGatewaysProvider
broker = RabbitBroker(url=settings.RABBIT_URL)
app = FastStream(broker)
taskiq_broker = BrokerWrapper(broker)
scheduler = StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(broker=taskiq_broker)]
)
container = make_async_container(
ProdGatewaysProvider()
)
setup_dishka(container, app)
def create_lifespan():
@asynccontextmanager
async def lifespan(app: FastAPI):
await broker.connect()
yield
await broker.close()
return lifespanpart of docker compose worker:
build: .
command: pdm run faststream run app.shared_kernel.message_broker.fast_stream_broker:app
networks:
- my_network
restart: always
scheduler:
build: .
command: >
pdm run taskiq scheduler
app.shared_kernel.message_broker.fast_stream_broker:scheduler
app.user_context.application.use_cases.order_use_cases.tasks
app.deliveries_context.application.tasks
app.reports_context.application.tasks
networks:
- my_networkscheduled tasks code taskiq_broker.task(queue="send report", schedule=[{"cron": "*/1 * * * *"}])
@broker.subscriber(queue="send report")
@inject
async def gen_and_send_sales_report(order_repo: FromDishka[IOrderRepository]):
try:
orders = await order_repo.get_orders_completed_on_this_week()
_send_sales_report(orders)
_send_payments_report(orders)
except Exception as exc:
logger.exception(exc) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 8 replies
-
|
@rylofu why are you using |
Beta Was this translation helpful? Give feedback.
-
|
This is my mistake. It should actually be two issues, although it may be one problem. I use Taskiq for scheduled tasks. The problem is that my FastStream application does not consume tasks. I run it in a Docker container, as shown in the code above, and send tasks this way, but nothing happens. @broker.subscriber(queue="in queue")
def log_messsage(message: str):
requests.post(
url=f"https://api.telegram.org/bot{settings.TELEGRAM_LOGIN_BOT_API_KEY}/sendMessage",
data={"chat_id": "my telegram", "text": "my message"},
)
print("execute task")
logger.info("inside task")
@router.get("/products", response_model=DisplayProductsPage)
async def get_products_page(
*args,
) -> DisplayProductsPage:
# await broker.connect()
await broker.publish(message="i am in", queue="in queue")If I use fastapi lifespan with broker.start(), the tasks are executed, but they are executed by the fastapi application itself, not by the dedicated faststream app. |
Beta Was this translation helpful? Give feedback.
@rylofu, sure, how can you see it if you didn't include it in the broker? The broker needs to know about the subscribers so it can run. I recommend to use routers here https://faststream.ag2.ai/latest/getting-started/routers/