-
Notifications
You must be signed in to change notification settings - Fork 1.6k
fix: assertion failure in JobQueue::stop
#5774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
3adfa07
ed6dcdb
32a3f0a
a1f6580
6fd30eb
7fb8f5f
a70e60e
5f3b3a6
671436c
ab52fde
34706ef
5285627
622bb71
52439eb
9a3a58d
6f0767a
63ef46b
1dab932
bce1520
fbe4f7d
ae351b8
2acee44
3e4cb67
472bcf6
976fd82
75e402a
222b721
d48bf8d
922383f
6bfa772
aae09f2
9a2188d
2db9ffb
02793cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,50 +3,76 @@ | |
|
|
||
| #include <xrpl/basics/ByteUtilities.h> | ||
|
|
||
| #include <mutex> | ||
|
|
||
| namespace xrpl { | ||
|
|
||
| template <class F> | ||
| JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f) | ||
| : jq_(jq) | ||
| , type_(type) | ||
| , name_(name) | ||
| , running_(false) | ||
| , coro_( | ||
| [this, fn = std::forward<F>(f)](boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) { | ||
| [this, fn = std::forward<F>(f)](boost::coroutines::asymmetric_coroutine<int>::push_type& do_yield) { | ||
| state_ = CoroState::Running; | ||
| yield_ = &do_yield; | ||
| yield(); | ||
| fn(shared_from_this()); | ||
| #ifndef NDEBUG | ||
| finished_ = true; | ||
| #endif | ||
| try | ||
| { | ||
| yield(); | ||
| // self makes Coro alive until this function returns | ||
| std::shared_ptr<Coro> self; | ||
| self = shared_from_this(); | ||
| fn(self); | ||
| state_ = CoroState::Finished; | ||
| } | ||
| catch (...) | ||
| { | ||
| state_ = CoroState::Finished; | ||
| throw; | ||
| } | ||
| }, | ||
| boost::coroutines::attributes(megabytes(1))) | ||
| { | ||
| } | ||
|
|
||
| inline JobQueue::Coro::~Coro() | ||
| { | ||
| #ifndef NDEBUG | ||
| XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished"); | ||
| #endif | ||
| cancel(); | ||
| XRPL_ASSERT(state_ != CoroState::Running, "xrpl::JobQueue::Coro::~Coro : is not running"); | ||
| } | ||
|
|
||
| inline void | ||
| JobQueue::Coro::yield() const | ||
| JobQueue::Coro::yield() | ||
| { | ||
| CoroState expected = CoroState::Running; | ||
| if (!state_.compare_exchange_strong(expected, CoroState::Suspended)) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| { | ||
| std::lock_guard lock(jq_.m_mutex); | ||
|
|
||
| ++jq_.nSuspend_; | ||
| jq_.cv_.notify_all(); | ||
| jq_.m_suspendedCoros[this] = weak_from_this(); | ||
| } | ||
| (*yield_)(); | ||
|
|
||
| state_.notify_all(); | ||
| (*yield_)(++yieldCount_); | ||
| } | ||
|
|
||
| inline bool | ||
| JobQueue::Coro::post() | ||
| { | ||
| if (state_ != CoroState::Suspended) | ||
| { | ||
| std::lock_guard lk(mutex_run_); | ||
| running_ = true; | ||
| // The coroutine will run until it finishes if the JobQueue has stopped. | ||
| // In the case where make_shared<Coro>() succeeds and then the JobQueue | ||
| // stops before coro_ gets executed, post() will still be called and | ||
|
Comment on lines
+71
to
+72
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to fix this issue, where JobQueue gets stopped before coroutines finish. It shouldn't. It should keep waiting for coroutines.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JobQueue won't fully stop before all coroutines stop. |
||
| // state_ will be Finished. We should return false and avoid XRPL_ASSERT | ||
| // as it's a valid edge case. | ||
| return false; | ||
| } | ||
|
|
||
| // sp keeps 'this' alive | ||
|
|
@@ -55,68 +81,85 @@ | |
| return true; | ||
| } | ||
|
|
||
| // The coroutine will not run. Clean up running_. | ||
| std::lock_guard lk(mutex_run_); | ||
| running_ = false; | ||
| cv_.notify_all(); | ||
| return false; | ||
| } | ||
|
|
||
| inline void | ||
| JobQueue::Coro::resume() | ||
| { | ||
| auto suspended = CoroState::Suspended; | ||
| if (!state_.compare_exchange_strong(suspended, CoroState::Running)) | ||
| { | ||
| std::lock_guard lk(mutex_run_); | ||
| running_ = true; | ||
| return; | ||
| } | ||
|
|
||
| state_.notify_all(); | ||
|
|
||
| // There's a small chance that the coroutine has not yielded yet and is | ||
| // still running. We need to wait for it to yield before we can resume it. | ||
| waitForYield(); | ||
|
|
||
| { | ||
| std::lock_guard lock(jq_.m_mutex); | ||
| jq_.m_suspendedCoros.erase(this); | ||
| --jq_.nSuspend_; | ||
| jq_.cv_.notify_all(); | ||
| } | ||
|
|
||
| auto saved = detail::getLocalValues().release(); | ||
| detail::getLocalValues().reset(&lvs_); | ||
| std::lock_guard lock(mutex_); | ||
| XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable"); | ||
| coro_(); | ||
| detail::getLocalValues().release(); | ||
| detail::getLocalValues().reset(saved); | ||
| std::lock_guard lk(mutex_run_); | ||
| running_ = false; | ||
| cv_.notify_all(); | ||
| } | ||
|
|
||
| inline bool | ||
| JobQueue::Coro::runnable() const | ||
| { | ||
| return static_cast<bool>(coro_); | ||
| // There's an edge case where the coroutine has updated the status | ||
| // to Finished but the function hasn't exited and therefore, coro_ is | ||
| // still valid. However, the coroutine is not technically runnable in this | ||
| // case, because the coroutine is about to exit and static_cast<bool>(coro_) | ||
| // is going to be false. | ||
| return static_cast<bool>(coro_) && state_ != CoroState::Finished; | ||
| } | ||
|
|
||
| inline void | ||
| JobQueue::Coro::join() | ||
| { | ||
| state_.wait(CoroState::Running); | ||
| } | ||
|
|
||
| inline void | ||
| JobQueue::Coro::expectEarlyExit() | ||
| JobQueue::Coro::cancel() | ||
| { | ||
| #ifndef NDEBUG | ||
| if (!finished_) | ||
| #endif | ||
| auto suspended = CoroState::Suspended; | ||
| if (!state_.compare_exchange_strong(suspended, CoroState::Running)) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| waitForYield(); | ||
|
|
||
| coro_ = {}; | ||
|
|
||
| { | ||
| // expectEarlyExit() must only ever be called from outside the | ||
| // Coro's stack. It you're inside the stack you can simply return | ||
| // and be done. | ||
| // | ||
| // That said, since we're outside the Coro's stack, we need to | ||
| // decrement the nSuspend that the Coro's call to yield caused. | ||
| std::lock_guard lock(jq_.m_mutex); | ||
| jq_.m_suspendedCoros.erase(this); | ||
| --jq_.nSuspend_; | ||
| #ifndef NDEBUG | ||
| finished_ = true; | ||
| #endif | ||
| jq_.cv_.notify_all(); | ||
| } | ||
|
|
||
| XRPL_ASSERT(state_ == CoroState::Finished, "ripple::JobQueue::Coro::cancel : should have finished"); | ||
| } | ||
|
|
||
| inline void | ||
| JobQueue::Coro::join() | ||
| JobQueue::Coro::waitForYield() const | ||
| { | ||
| std::unique_lock<std::mutex> lk(mutex_run_); | ||
| cv_.wait(lk, [this]() { return running_ == false; }); | ||
| // Busy-wait for the coroutine to yield so that it's safe to destroy it. | ||
| while (yieldCount_ != coro_.get()) | ||
| ; | ||
| } | ||
|
|
||
| } // namespace xrpl | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.