diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 343b567d8b852..a6ffaf5c4cbc6 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1680,6 +1680,33 @@ struct WaitBackpressurePolicy { } }; +auto forwardOnInsertion(ServiceRegistryRef& ref, std::span& messages) -> void +{ + O2_LOG_ENABLE(forwarding); + O2_SIGNPOST_ID_GENERATE(sid, forwarding); + auto& timesliceIndex = ref.get(); + auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput(); + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for incoming data with oldestTimeslice %zu with copy.", + oldestTimeslice.timeslice.value); + auto& proxy = ref.get(); + std::vector forwardedParts; + DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false); + + for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { + if (forwardedParts[fi].Size() == 0) { + continue; + } + ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi}); + auto& parts = forwardedParts[fi]; + if (info.policy == nullptr) { + O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi); + continue; + } + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi); + info.policy->forward(parts, ChannelIndex{fi}, ref); + } +}; + /// This is the inner loop of our framework. The actual implementation /// is divided in two parts. In the first one we define a set of lambdas /// which describe what is actually going to happen, hiding all the state @@ -1859,7 +1886,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& input, nMessages, nPayloadsPerHeader, - nullptr, + forwardOnInsertion, onDrop); switch (relayed.type) { case DataRelayer::RelayChoice::Type::Backpressured: @@ -2274,9 +2301,15 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting; if (context.canForwardEarly && hasForwards && consumeSomething) { - O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str()); auto& timesliceIndex = ref.get(); - forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume); + auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput(); + auto& asyncQueue = ref.get(); + auto& decongestion = ref.get(); + O2_SIGNPOST_ID_GENERATE(aid, async_queue); + O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value); + AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate} + .user({.ref = ref, .oldestTimeslice = oldestTimeslice})); + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); } markInputsAsDone(action.slot);