Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 178 additions & 1 deletion owlapy/owl_reasoner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
OWLObjectIntersectionOf, OWLObjectComplementOf, OWLObjectAllValuesFrom, OWLObjectOneOf, OWLObjectHasValue, \
OWLObjectMinCardinality, OWLObjectMaxCardinality, OWLObjectExactCardinality, OWLObjectCardinalityRestriction, \
OWLDataSomeValuesFrom, OWLDataOneOf, OWLDatatypeRestriction, OWLFacetRestriction, OWLDataHasValue, \
OWLDataAllValuesFrom, OWLNothing, OWLThing
OWLDataAllValuesFrom, OWLNothing, OWLThing, OWLDataMinCardinality, OWLDataMaxCardinality, OWLDataExactCardinality
from owlapy.class_expression import OWLClass
from owlapy.iri import IRI
from owlapy.owl_axiom import OWLAxiom, OWLSubClassOfAxiom
Expand Down Expand Up @@ -989,6 +989,183 @@ def _lazy_cache_class(self, c: OWLClass) -> None:
temp = self.get_instances_from_owl_class(c)
self._cls_to_ind[c] = frozenset(temp)

@_find_instances.register
def _(self, ce: OWLDataMinCardinality):
return self._get_instances_data_card_restriction(ce)

@_find_instances.register
def _(self, ce: OWLDataMaxCardinality):
all_ = frozenset(self._ontology.individuals_in_signature())
min_ind = self._get_instances_data_card_restriction(
OWLDataMinCardinality(cardinality=ce.get_cardinality() + 1,
property=ce.get_property(),
filler=ce.get_filler()))
return all_ ^ min_ind

@_find_instances.register
def _(self, ce: OWLDataExactCardinality):
return self._get_instances_data_card_restriction(ce)

def _get_instances_data_card_restriction(self, ce) -> FrozenSet[OWLNamedIndividual]:
"""Get instances for OWLDataMinCardinality or OWLDataExactCardinality restrictions."""
pe = ce.get_property()
filler = ce.get_filler()
assert isinstance(pe, OWLDataProperty)

if isinstance(ce, OWLDataMinCardinality):
min_count = ce.get_cardinality()
max_count = None
elif isinstance(ce, OWLDataExactCardinality):
min_count = max_count = ce.get_cardinality()
else:
raise NotImplementedError

assert min_count >= 0
assert max_count is None or max_count >= 0

if self._ontology.is_modified and (self.class_cache or self._property_cache):
self.reset_and_disable_cache()
property_cache = self._property_cache

if property_cache:
self._lazy_cache_data_prop(pe)
dps = self._data_prop[pe]
else:
subs = self._some_values_subject_index(pe)

ind = set()

if isinstance(filler, OWLDatatype):
if property_cache:
for s, literals in dps.items():
count = sum(1 for lit in literals if lit.get_datatype() == filler)
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
count = sum(1 for lit in self.data_property_values(s, pe) if lit.get_datatype() == filler)
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
elif isinstance(filler, OWLDataOneOf):
values = set(filler.values())
if property_cache:
for s, literals in dps.items():
count = len(literals & values)
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
count = sum(1 for lit in self.data_property_values(s, pe) if lit in values)
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
elif isinstance(filler, OWLDatatypeRestriction):
def res_to_callable(res: OWLFacetRestriction):
op = res.get_facet().operator
v = res.get_facet_value()

def inner(lv: OWLLiteral):
return op(lv, v)

return inner

apply = FunctionType.__call__
facet_restrictions = tuple(map(res_to_callable, filler.get_facet_restrictions()))

def include(lv: OWLLiteral):
return lv.get_datatype() == filler.get_datatype() and \
all(map(apply, facet_restrictions, repeat(lv)))

if property_cache:
for s, literals in dps.items():
count = sum(1 for lit in literals if include(lit))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
count = sum(1 for lit in self.data_property_values(s, pe) if include(lit))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
elif isinstance(filler, OWLDataComplementOf):
# Count values NOT matching the inner data range
inner_filler = filler.get_data_range()
# some_inner = OWLDataSomeValuesFrom(property=pe, filler=inner_filler)
# We need to count per-individual, so we iterate
if property_cache:
for s, literals in dps.items():
# Count literals that match the complement (i.e., do NOT match the inner filler)
match_inner = self._count_data_filler_matches(literals, inner_filler)
count = len(literals) - match_inner
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
all_lits = set(self.data_property_values(s, pe))
match_inner = self._count_data_filler_matches(all_lits, inner_filler)
count = len(all_lits) - match_inner
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
elif isinstance(filler, OWLDataUnionOf):
if property_cache:
for s, literals in dps.items():
count = sum(1 for lit in literals
if any(self._literal_matches_data_range(lit, op)
for op in filler.operands()))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
count = sum(1 for lit in self.data_property_values(s, pe)
if any(self._literal_matches_data_range(lit, op)
for op in filler.operands()))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
elif isinstance(filler, OWLDataIntersectionOf):
if property_cache:
for s, literals in dps.items():
count = sum(1 for lit in literals
if all(self._literal_matches_data_range(lit, op)
for op in filler.operands()))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
for s in subs:
count = sum(1 for lit in self.data_property_values(s, pe)
if all(self._literal_matches_data_range(lit, op)
for op in filler.operands()))
if count >= min_count and (max_count is None or count <= max_count):
ind.add(s)
else:
raise ValueError

return frozenset(ind)

def _literal_matches_data_range(self, lit: OWLLiteral, dr) -> bool:
"""Check if a single literal matches a data range."""
if isinstance(dr, OWLDatatype):
return lit.get_datatype() == dr
elif isinstance(dr, OWLDataOneOf):
return lit in set(dr.values())
elif isinstance(dr, OWLDatatypeRestriction):
if lit.get_datatype() != dr.get_datatype():
return False
for res in dr.get_facet_restrictions():
op = res.get_facet().operator
if not op(lit, res.get_facet_value()):
return False
return True
elif isinstance(dr, OWLDataComplementOf):
return not self._literal_matches_data_range(lit, dr.get_data_range())
elif isinstance(dr, OWLDataUnionOf):
return any(self._literal_matches_data_range(lit, op) for op in dr.operands())
elif isinstance(dr, OWLDataIntersectionOf):
return all(self._literal_matches_data_range(lit, op) for op in dr.operands())
else:
raise ValueError(f"Unsupported data range type: {type(dr)}")

def _count_data_filler_matches(self, literals: Set[OWLLiteral], dr) -> int:
"""Count how many literals in the set match the given data range."""
return sum(1 for lit in literals if self._literal_matches_data_range(lit, dr))

def get_instances_from_owl_class(self, c: OWLClass):
if c.is_owl_thing():
yield from self._ontology.individuals_in_signature()
Expand Down
Loading
Loading