Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3adfa07
Fix the assertion failure in JobQueue::stop when exiting but there's …
a1q123456 Sep 8, 2025
ed6dcdb
Add unit test
a1q123456 Sep 8, 2025
32a3f0a
Fix formatting
a1q123456 Sep 8, 2025
a1f6580
Fix the bug
a1q123456 Sep 16, 2025
6fd30eb
Fix formatting
a1q123456 Sep 16, 2025
7fb8f5f
Fix the bug
a1q123456 Sep 18, 2025
a70e60e
Merge branch 'develop' into a1q123456/fix-job-queue-stop
a1q123456 Sep 18, 2025
5f3b3a6
Fix multithreading bugs
a1q123456 Sep 18, 2025
671436c
Merge branch 'develop' into a1q123456/fix-job-queue-stop
a1q123456 Sep 18, 2025
ab52fde
Fix multithreading bugs
a1q123456 Sep 18, 2025
34706ef
Fix test case bug
a1q123456 Sep 18, 2025
5285627
Fix formatting
a1q123456 Sep 18, 2025
622bb71
Merge branch 'develop' into a1q123456/fix-job-queue-stop
Bronek Sep 23, 2025
52439eb
Fix edge case
a1q123456 Sep 25, 2025
9a3a58d
Merge branch 'develop' into a1q123456/fix-job-queue-stop
Bronek Sep 26, 2025
6f0767a
Remove redundant block
a1q123456 Sep 26, 2025
63ef46b
Make the assertion in the destructor unconditional
a1q123456 Sep 26, 2025
1dab932
Update src/xrpld/core/Coro.ipp
a1q123456 Oct 2, 2025
bce1520
Address PR comments
a1q123456 Oct 2, 2025
fbe4f7d
Fix formatting
a1q123456 Oct 2, 2025
ae351b8
Merge branch 'develop' into a1q123456/fix-job-queue-stop
a1q123456 Oct 2, 2025
2acee44
Fix error
a1q123456 Oct 2, 2025
3e4cb67
Fix error
a1q123456 Oct 2, 2025
472bcf6
Merge branch 'develop' into a1q123456/fix-job-queue-stop
bthomee Oct 23, 2025
976fd82
Address PR comments
a1q123456 Nov 21, 2025
75e402a
Fix errors
a1q123456 Nov 25, 2025
222b721
Merge remote-tracking branch 'origin/develop' into a1q123456/fix-job-…
a1q123456 Jan 22, 2026
d48bf8d
Make the coroutines clean up themselves
a1q123456 Jan 23, 2026
922383f
Make the coroutines clean up themselves
a1q123456 Jan 26, 2026
6bfa772
Fix JobQueue issues
a1q123456 Jan 27, 2026
aae09f2
Fix issues
a1q123456 Jan 29, 2026
9a2188d
Fix a potential problem
a1q123456 Jan 30, 2026
2db9ffb
Merge remote-tracking branch 'origin/develop' into a1q123456/fix-job-…
a1q123456 Feb 2, 2026
02793cd
Fix thread name length
a1q123456 Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 84 additions & 41 deletions include/xrpl/core/Coro.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,76 @@

#include <xrpl/basics/ByteUtilities.h>

#include <mutex>

namespace xrpl {

template <class F>
JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
[this, fn = std::forward<F>(f)](boost::coroutines::asymmetric_coroutine<int>::push_type& do_yield) {
state_ = CoroState::Running;
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
try
{
yield();
// self makes Coro alive until this function returns
std::shared_ptr<Coro> self;
self = shared_from_this();
fn(self);
state_ = CoroState::Finished;
}
catch (...)
{
state_ = CoroState::Finished;
throw;
}
},
boost::coroutines::attributes(megabytes(1)))
{
}

inline JobQueue::Coro::~Coro()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
cancel();
XRPL_ASSERT(state_ != CoroState::Running, "xrpl::JobQueue::Coro::~Coro : is not running");
}

inline void
JobQueue::Coro::yield() const
JobQueue::Coro::yield()
{
CoroState expected = CoroState::Running;
if (!state_.compare_exchange_strong(expected, CoroState::Suspended))
{

Check warning on line 49 in include/xrpl/core/Coro.ipp

View check run for this annotation

Codecov / codecov/patch

include/xrpl/core/Coro.ipp#L49

Added line #L49 was not covered by tests
return;
}

{
std::lock_guard lock(jq_.m_mutex);

++jq_.nSuspend_;
jq_.cv_.notify_all();
jq_.m_suspendedCoros[this] = weak_from_this();
}
(*yield_)();

state_.notify_all();
(*yield_)(++yieldCount_);
}
Comment thread
a1q123456 marked this conversation as resolved.

inline bool
JobQueue::Coro::post()
{
if (state_ != CoroState::Suspended)
{
std::lock_guard lk(mutex_run_);
running_ = true;
// The coroutine will run until it finishes if the JobQueue has stopped.
// In the case where make_shared<Coro>() succeeds and then the JobQueue
// stops before coro_ gets executed, post() will still be called and
Comment on lines +71 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to fix this issue, where JobQueue gets stopped before coroutines finish. It shouldn't. It should keep waiting for coroutines.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobQueue won't fully stop before all coroutines stop.

// state_ will be Finished. We should return false and avoid XRPL_ASSERT
// as it's a valid edge case.

Check warning on line 74 in include/xrpl/core/Coro.ipp

View check run for this annotation

Codecov / codecov/patch

include/xrpl/core/Coro.ipp#L74

Added line #L74 was not covered by tests
return false;
}

// sp keeps 'this' alive
Expand All @@ -55,68 +81,85 @@
return true;
}

// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}

inline void
JobQueue::Coro::resume()
{
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
std::lock_guard lk(mutex_run_);
running_ = true;
return;
}

state_.notify_all();

// There's a small chance that the coroutine has not yielded yet and is
// still running. We need to wait for it to yield before we can resume it.
waitForYield();

{
std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_;
jq_.cv_.notify_all();
}

auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable");
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}

inline bool
JobQueue::Coro::runnable() const
{
return static_cast<bool>(coro_);
// There's an edge case where the coroutine has updated the status
// to Finished but the function hasn't exited and therefore, coro_ is
// still valid. However, the coroutine is not technically runnable in this
// case, because the coroutine is about to exit and static_cast<bool>(coro_)
// is going to be false.
return static_cast<bool>(coro_) && state_ != CoroState::Finished;
}

inline void
JobQueue::Coro::join()
{
state_.wait(CoroState::Running);
}

inline void
JobQueue::Coro::expectEarlyExit()
JobQueue::Coro::cancel()
{
#ifndef NDEBUG
if (!finished_)
#endif
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
return;
}

waitForYield();

coro_ = {};

{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
jq_.cv_.notify_all();
}

XRPL_ASSERT(state_ == CoroState::Finished, "ripple::JobQueue::Coro::cancel : should have finished");
}

inline void
JobQueue::Coro::join()
JobQueue::Coro::waitForYield() const
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
// Busy-wait for the coroutine to yield so that it's safe to destroy it.
while (yieldCount_ != coro_.get())
;
}

} // namespace xrpl
Expand Down
82 changes: 59 additions & 23 deletions include/xrpl/core/JobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,26 @@
class JobQueue : private Workers::Callback
{
public:
enum class QueueState { Accepting, Stopping, Stopped };

/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
friend class JobQueue;

public:
enum class CoroState { None, Suspended, Running, Finished };

private:
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
std::atomic<CoroState> state_ = CoroState::None;
std::atomic_int yieldCount_ = 0;

boost::coroutines::asymmetric_coroutine<int>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<int>::push_type* yield_;

public:
// Private: Used in the implementation
Expand All @@ -74,10 +77,12 @@
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
post.
It may not suspend at all if the JobQueue is stopping, and returns
false in such a case.
*/
void
yield() const;
yield();

/** Schedule coroutine execution.
Effects:
Expand Down Expand Up @@ -107,17 +112,28 @@
void
resume();

CoroState
state() const
{
return state_;
}

/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;

/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();

/** Waits until coroutine returns from the user function. */
void
join();

/** Cancel the coroutine. */
void
cancel();

private:
/** Wait for the coroutine to yield. */
void
waitForYield() const;
};

using JobFunction = std::function<void()>;
Expand All @@ -144,12 +160,13 @@
typename = std::enable_if_t<std::is_same<decltype(std::declval<JobHandler&&>()()), void>::value>>
bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob = jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
if (queueState_ != QueueState::Accepting)
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
return false;
}
return false;
return addJobNoStatusCheck(type, name, std::forward<JobHandler>(jobHandler));
}

/** Creates a coroutine and adds a job to the queue which will run it.
Expand Down Expand Up @@ -208,13 +225,16 @@
bool
isStopping() const
{
return stopping_;
return queueState_ == QueueState::Stopping;
}

// We may be able to move away from this, but we can keep it during the
// transition.
bool
isStopped() const;
isStopped() const
{
return queueState_ == QueueState::Stopped;
}

private:
friend class Coro;
Expand All @@ -226,8 +246,7 @@
std::uint64_t m_lastJob;
std::set<Job> m_jobSet;
JobCounter jobCounter_;
std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false};
std::atomic<QueueState> queueState_{QueueState::Accepting};
JobDataMap m_jobData;
JobTypeData m_invalidJobData;

Expand All @@ -237,6 +256,8 @@
// The number of suspended coroutines
int nSuspend_ = 0;

std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;

Workers m_workers;

// Statistics tracking
Expand All @@ -252,6 +273,18 @@
JobTypeData&
getJobTypeData(JobType type);

template <typename JobHandler>
bool
addJobNoStatusCheck(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob = jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}

Check warning on line 284 in include/xrpl/core/JobQueue.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/core/JobQueue.h#L284

Added line #L284 was not covered by tests
return false;
}

// Adds a reference counted job to the JobQueue.
//
// param type The type of job.
Expand Down Expand Up @@ -386,6 +419,10 @@
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
if (queueState_ != QueueState::Accepting)
{
return nullptr;
}
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
Expand All @@ -395,7 +432,6 @@
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
return coro;
Expand Down
Loading
Loading