Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 67 additions & 20 deletions cpp/oneapi/dal/algo/subgraph_isomorphism/backend/cpu/matching.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
inner_alloc alloc);
virtual ~matching_engine();

template <bool is_parallel>
void run_and_wait(global_stack<Cpu>& gstack,
std::int64_t& busy_engine_count,
std::int64_t& current_match_count,
Expand All @@ -59,6 +60,7 @@
std::int64_t state_exploration_bit(bool check_solution = true);
std::int64_t state_exploration_list(bool check_solution = true);

template <bool is_parallel>
bool check_if_max_match_count_reached(std::int64_t& cumulative_match_count,
std::int64_t delta,
std::int64_t target_match_count);
Expand Down Expand Up @@ -92,9 +94,40 @@

std::int64_t extract_candidates(bool check_solution);
bool check_vertex_candidate(bool check_solution, std::int64_t candidate);
template <bool is_parallel>
void set_not_busy(bool& is_busy_engine, std::int64_t& busy_engine_count);
};

template <bool is_parallel>
void increment_shared_value(std::int64_t& value, std::int64_t delta = 1) {
if constexpr (is_parallel) {
dal::detail::atomic_increment(value, delta);
}
else {
value += delta;
}
}

template <bool is_parallel>
void decrement_shared_value(std::int64_t& value, std::int64_t delta = 1) {
if constexpr (is_parallel) {
dal::detail::atomic_decrement(value, delta);
}
else {
value -= delta;
}
}

template <bool is_parallel>
std::int64_t load_shared_value(std::int64_t& value) {
if constexpr (is_parallel) {
return dal::detail::atomic_load(value);
}
else {
return value;
}
}

template <typename Cpu>
class engine_bundle {
public:
Expand Down Expand Up @@ -372,28 +405,31 @@
}

template <typename Cpu>
template <bool is_parallel>
bool matching_engine<Cpu>::check_if_max_match_count_reached(std::int64_t& cumulative_match_count,
std::int64_t delta,
std::int64_t target_match_count) {
bool is_reached = false;
if (delta > 0) {
dal::detail::atomic_increment(cumulative_match_count, delta);
increment_shared_value<is_parallel>(cumulative_match_count, delta);
}
if (dal::detail::atomic_load(cumulative_match_count) >= target_match_count) {
if (load_shared_value<is_parallel>(cumulative_match_count) >= target_match_count) {
is_reached = true;
}
return is_reached;
}

template <typename Cpu>
template <bool is_parallel>
void matching_engine<Cpu>::set_not_busy(bool& is_busy_engine, std::int64_t& busy_engine_count) {
if (is_busy_engine) {
is_busy_engine = false;
dal::detail::atomic_decrement(busy_engine_count);
decrement_shared_value<is_parallel>(busy_engine_count);
}
}

template <typename Cpu>
template <bool is_parallel>
void matching_engine<Cpu>::run_and_wait(global_stack<Cpu>& gstack,
std::int64_t& busy_engine_count,
std::int64_t& cumulative_match_count,
Expand All @@ -407,8 +443,8 @@
ONEDAL_ASSERT(pattern != nullptr);
for (;;) {
if (target_match_count > 0 &&
dal::detail::atomic_load(cumulative_match_count) >= target_match_count) {
set_not_busy(is_busy_engine, busy_engine_count);
load_shared_value<is_parallel>(cumulative_match_count) >= target_match_count) {
set_not_busy<is_parallel>(is_busy_engine, busy_engine_count);
break;
}
if (hlocal_stack.states_in_stack() > 0) {
Expand All @@ -417,27 +453,28 @@
ONEDAL_ASSERT(hlocal_stack.states_in_stack() > 0);
const auto delta = state_exploration();
current_match_count += delta;
if (target_match_count > 0 && check_if_max_match_count_reached(cumulative_match_count,
delta,
target_match_count)) {
set_not_busy(is_busy_engine, busy_engine_count);
if (target_match_count > 0 &&
check_if_max_match_count_reached<is_parallel>(cumulative_match_count,
delta,
target_match_count)) {
set_not_busy<is_parallel>(is_busy_engine, busy_engine_count);
break;
}
}
else {
gstack.pop(hlocal_stack);
if (hlocal_stack.empty()) {
set_not_busy(is_busy_engine, busy_engine_count);
set_not_busy<is_parallel>(is_busy_engine, busy_engine_count);
if (target_match_count > 0 &&
dal::detail::atomic_load(cumulative_match_count) >= target_match_count) {
load_shared_value<is_parallel>(cumulative_match_count) >= target_match_count) {
break;
}
if (dal::detail::atomic_load(busy_engine_count) == 0)
if (load_shared_value<is_parallel>(busy_engine_count) == 0)
break;
}
else if (!is_busy_engine) {
is_busy_engine = true;
dal::detail::atomic_increment(busy_engine_count);
increment_shared_value<is_parallel>(busy_engine_count);
}
}
}
Expand Down Expand Up @@ -507,81 +544,91 @@
}

template <typename Cpu>
solution<Cpu> engine_bundle<Cpu>::run(std::int64_t max_match_count) {
std::int64_t degree = pattern->get_vertex_degree(sorted_pattern_vertex[0]);

std::uint64_t first_states_count =
pattern_vertex_probability[0] * target->get_vertex_count() + 1;
std::uint64_t max_threads_count = dal::detail::threader_get_max_threads();
std::uint64_t possible_first_states_count_per_thread = first_states_count / max_threads_count;
if (possible_first_states_count_per_thread < 1) {
possible_first_states_count_per_thread = 1;
}
else {
possible_first_states_count_per_thread +=
static_cast<bool>(first_states_count % max_threads_count);
}

std::uint64_t array_size = (max_threads_count >= 64) ? max_threads_count * 2 / 10
: (max_threads_count >= 24) ? max_threads_count * 4 / 10
: (max_threads_count >= 8) ? 4
: (max_threads_count >= 4) ? 2
: 1;
#if defined(TARGET_X86_64)
if (typeid(Cpu) == typeid(oneapi::dal::backend::cpu_dispatch_avx2) ||
typeid(Cpu) == typeid(oneapi::dal::backend::cpu_dispatch_sse42)) {
// TODO: Workaround that disabled parallelism for SSE4.2 and AVX2 code paths
// due to observed timeouts in case of execution under emulator
// related to atomics usage in such configurations.
array_size = 1;
}
#endif
auto engine_array_ptr = allocator.make_shared_memory<matching_engine<Cpu>>(array_size);
matching_engine<Cpu>* engine_array = engine_array_ptr.get();

for (std::uint64_t i = 0; i < array_size; ++i) {
new (engine_array + i) matching_engine<Cpu>(pattern,
target,
sorted_pattern_vertex,
predecessor,
direction,
pconsistent_conditions,
isomorphism_kind,
allocator);
}

state<Cpu> null_state(allocator);
std::uint64_t task_counter = 0, index = 0;
for (std::int64_t i = 0; i < target->get_vertex_count(); ++i) {
if (degree <= target->get_vertex_degree(i) &&
pattern->get_vertex_attribute(sorted_pattern_vertex[0]) ==
target->get_vertex_attribute(i)) {
index = task_counter % array_size;
engine_array[index].push_into_stack(i);

if ((engine_array[index].hlocal_stack.states_in_stack() /
possible_first_states_count_per_thread) > 0) {
task_counter++;
}
}
}

global_stack<Cpu> gstack(pattern->get_vertex_count(), allocator);
std::int64_t busy_engine_count(array_size);
std::int64_t cumulative_match_count(0);
dal::detail::threader_for(array_size, array_size, [&](const int index) {
engine_array[index].run_and_wait(gstack,
busy_engine_count,
cumulative_match_count,
max_match_count,
false);
});

if (array_size == 1) {
engine_array[0].template run_and_wait<false>(gstack,
busy_engine_count,
cumulative_match_count,
max_match_count,
false);
}
else {
dal::detail::threader_for(array_size, array_size, [&](const int index) {
engine_array[index].template run_and_wait<true>(gstack,
busy_engine_count,
cumulative_match_count,
max_match_count,
false);
});
}

auto aggregated_solution = combine_solutions(engine_array, array_size, max_match_count);

for (std::uint64_t i = 0; i < array_size; i++) {
engine_array[i].~matching_engine();
}
return aggregated_solution;
}

Check notice on line 633 in cpp/oneapi/dal/algo/subgraph_isomorphism/backend/cpu/matching.hpp

View check run for this annotation

codefactor.io / CodeFactor

cpp/oneapi/dal/algo/subgraph_isomorphism/backend/cpu/matching.hpp#L547-L633

Complex Method
} // namespace oneapi::dal::preview::subgraph_isomorphism::backend
Loading