Skip to content

Commit f4747ee

Browse files
committed
feat: Add span filtering feature
Signed-off-by: Cagri Yonca <[email protected]>
1 parent c8e5db7 commit f4747ee

17 files changed

+1184
-603
lines changed

src/instana/agent/host.py

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from instana.options import StandardOptions
2323
from instana.util import to_json
2424
from instana.util.runtime import get_py_source, log_runtime_env_info
25-
from instana.util.span_utils import get_operation_specifiers
25+
from instana.util.span_utils import matches_rule
2626
from instana.version import VERSION
2727

2828
if TYPE_CHECKING:
@@ -357,51 +357,58 @@ def report_spans(self, payload: Dict[str, Any]) -> Optional[Response]:
357357

358358
def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
359359
"""
360-
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
360+
Filters span list using new hierarchical filtering rules.
361361
"""
362362
filtered_spans = []
363-
endpoint = ""
363+
364364
for span in spans:
365-
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
366-
service = span.n
367-
operation_specifier_key, service_specifier_key = (
368-
get_operation_specifiers(service)
369-
)
370-
if service == "kafka":
371-
endpoint = span.data[service][service_specifier_key]
372-
method = span.data[service][operation_specifier_key]
373-
if isinstance(method, str) and self.__is_endpoint_ignored(
374-
service, method, endpoint
375-
):
376-
continue
377-
else:
378-
filtered_spans.append(span)
379-
else:
365+
if not (hasattr(span, "n") or hasattr(span, "name")) or not hasattr(
366+
span, "data"
367+
):
380368
filtered_spans.append(span)
369+
continue
370+
371+
service_name = ""
372+
373+
# Set the service name
374+
for span_value in span.data.keys():
375+
if isinstance(span.data[span_value], dict):
376+
service_name = span_value
377+
378+
# Set span attributes for filtering
379+
attributes_to_check = {
380+
"type": service_name,
381+
"kind": span.k,
382+
}
383+
384+
# Add operation specifiers to the attributes
385+
for key, value in span.data[service_name].items():
386+
attributes_to_check[f"{service_name}.{key}"] = value
387+
388+
# Check if the span need to be ignored
389+
if self.__is_endpoint_ignored(attributes_to_check):
390+
continue
391+
392+
filtered_spans.append(span)
393+
381394
return filtered_spans
382395

383-
def __is_endpoint_ignored(
384-
self,
385-
service: str,
386-
method: str = "",
387-
endpoint: str = "",
388-
) -> bool:
389-
"""Check if the given service and endpoint combination should be ignored."""
390-
service = service.lower()
391-
method = method.lower()
392-
endpoint = endpoint.lower()
393-
filter_rules = [
394-
f"{service}.{method}", # service.method
395-
f"{service}.*", # service.*
396-
]
397-
398-
if service == "kafka" and endpoint:
399-
filter_rules += [
400-
f"{service}.{method}.{endpoint}", # service.method.endpoint
401-
f"{service}.*.{endpoint}", # service.*.endpoint
402-
f"{service}.{method}.*", # service.method.*
403-
]
404-
return any(rule in self.options.ignore_endpoints for rule in filter_rules)
396+
def __is_endpoint_ignored(self, span_attributes: dict) -> bool:
397+
filters = self.options.span_filters
398+
if not filters:
399+
return False
400+
401+
# Check include rules
402+
for rule in filters.get("include", [{}]):
403+
if matches_rule(rule.get("attributes", []), span_attributes):
404+
return False
405+
406+
# Check exclude rules
407+
for rule in filters.get("exclude", [{}]):
408+
if matches_rule(rule.get("attributes", []), span_attributes):
409+
return True
410+
411+
return False
405412

406413
def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
407414
"""

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,15 @@ def trace_kafka_produce(
7474
# Get the topic from either args or kwargs
7575
topic = args[0] if args else kwargs.get("topic", "")
7676

77+
attributes_to_check = {
78+
"type": "kafka",
79+
"kind": "exit",
80+
"kafka.service": topic,
81+
"kafka.access": "produce",
82+
}
83+
7784
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
78-
"kafka",
79-
"produce",
80-
topic,
85+
attributes_to_check
8186
)
8287

8388
with tracer.start_as_current_span(
@@ -137,10 +142,14 @@ def create_span(
137142
is_suppressed = False
138143

139144
if topic:
145+
attributes_to_check = {
146+
"type": "kafka",
147+
"kind": "entry",
148+
"kafka.service": topic,
149+
"kafka.access": span_type,
150+
}
140151
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
141-
"kafka",
142-
span_type,
143-
topic,
152+
attributes_to_check
144153
)
145154

146155
if not is_suppressed and headers:

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,26 @@ def trace_kafka_send(
3939

4040
# Get the topic from either args or kwargs
4141
topic = args[0] if args else kwargs.get("topic", "")
42+
attributes_to_check = {
43+
"type": "kafka",
44+
"kind": "exit",
45+
"kafka.service": topic,
46+
"kafka.access": "send",
47+
}
4248

4349
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
44-
"kafka",
45-
"send",
46-
topic,
50+
attributes_to_check
4751
)
52+
4853
with tracer.start_as_current_span(
4954
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
5055
) as span:
5156
span.set_attribute("kafka.service", topic)
5257
span.set_attribute("kafka.access", "send")
5358

54-
# context propagation
59+
# Context propagation
5560
headers = kwargs.get("headers", [])
61+
5662
if not is_suppressed and ("x_instana_l_s", b"0") in headers:
5763
is_suppressed = True
5864

@@ -70,6 +76,7 @@ def trace_kafka_send(
7076

7177
if tracer.exporter.options.kafka_trace_correlation:
7278
kwargs["headers"] = headers
79+
7380
try:
7481
res = wrapped(*args, **kwargs)
7582
return res
@@ -94,10 +101,14 @@ def create_span(
94101

95102
is_suppressed = False
96103
if topic:
104+
attributes_to_check = {
105+
"type": "kafka",
106+
"kind": "entry",
107+
"kafka.service": topic,
108+
"kafka.access": span_type,
109+
}
97110
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
98-
"kafka",
99-
span_type,
100-
topic,
111+
attributes_to_check
101112
)
102113

103114
if not is_suppressed and headers:

0 commit comments

Comments
 (0)