Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# rtui

# rtui2
[![PyPI - Version](https://img.shields.io/pypi/v/rtui2)](https://pypi.org/project/rtui2/)

TUI tool for ROS 2 Topic/Node debugging

![output](https://github.com/user-attachments/assets/576b9ecd-81cd-4b26-948c-65cea6ce49af)

## Support

- Python
Expand Down
52 changes: 52 additions & 0 deletions rtui2/ros/dependency_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from dataclasses import dataclass, field

from .client import RosClient
from .entity import RosEntity, RosEntityType


@dataclass
class RosDependencyNode:
entity: RosEntity
children: list[RosDependencyNode] = field(default_factory=list)


class RosDependencyGraph:
def __init__(
self, root_entity: RosEntity, ros_client: RosClient, max_depth: int = 1
) -> None:
self._ros = ros_client
self._max_depth = max_depth
self.root = self._build_graph(root_entity, depth=0)

def _build_graph(self, entity: RosEntity, depth: int) -> RosDependencyNode:
if depth > self._max_depth:
return RosDependencyNode(entity)

children: list[RosDependencyNode] = []

try:
info = self._ros.get_entity_info(entity)
except Exception:
return RosDependencyNode(entity)

if entity.type == RosEntityType.Node:
# Node → Subscribed Topics
for topic_name, _ in info.subscribers:
if topic_name == "/parameter_events":
continue

topic_entity = RosEntity.new_topic(topic_name)
child_node = self._build_graph(topic_entity, depth + 1)
children.append(child_node)

elif entity.type == RosEntityType.Topic:
# Topic → Publisher Nodes
for pub_info in info.publishers:
pub_node_name = pub_info[0]
node_entity = RosEntity.new_node(pub_node_name)
child_node = self._build_graph(node_entity, depth)
children.append(child_node)

return RosDependencyNode(entity, children)
4 changes: 2 additions & 2 deletions rtui2/ros/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ class NodeInfo(RosEntityInfo):
def to_textual(self) -> str:
text = f"""[b]Node:[/b] {self.name}

[b]Publishers:[/b]{_common_entities_with_type(self.publishers, "topic_link", "msg_type_link")}
[b]Publishes:[/b]{_common_entities_with_type(self.publishers, "topic_link", "msg_type_link")}

[b]Subscribers:[/b]{_common_entities_with_type(self.subscribers, "topic_link", "msg_type_link")}
[b]Subscribes:[/b]{_common_entities_with_type(self.subscribers, "topic_link", "msg_type_link")}

[b]Service Servers:[/b]{_common_entities_with_type(self.service_servers, "service_link", "srv_type_link")}
"""
Expand Down
20 changes: 16 additions & 4 deletions rtui2/screens.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@
from textual.widgets import Footer

from .ros import RosClient, RosEntity, RosEntityType
from .widgets import RosEntityInfoPanel, RosEntityListPanel, RosTypeDefinitionPanel
from .widgets import (
RosEntityGraphPanel,
RosEntityInfoPanel,
RosEntityListPanel,
RosTypeDefinitionPanel,
)


class RosEntityInspection(Screen):
_entity_type: RosEntityType
_entity_name: str | None
_list_panel: RosEntityListPanel
_info_panel: RosEntityInfoPanel
_graph_panel: RosEntityGraphPanel
_definition_panel: RosTypeDefinitionPanel | None = None

DEFAULT_CSS = """
Expand Down Expand Up @@ -50,13 +56,17 @@ def __init__(self, ros: RosClient, entity_type: RosEntityType) -> None:
None,
update_interval=5.0,
)
self._graph_panel = RosEntityGraphPanel(
ros, None, on_highlighted_changed=self._info_panel.set_entity
)
if entity_type.has_definition():
self._definition_panel = RosTypeDefinitionPanel(ros)

def set_entity_name(self, name: str) -> None:
self._entity_name = name
entity = RosEntity(type=self._entity_type, name=self._entity_name)
self._info_panel.set_entity(entity)
self._graph_panel.set_entity(entity)
if self._definition_panel is not None:
self._definition_panel.set_entity(entity)

Expand All @@ -70,10 +80,12 @@ def compose(self) -> ComposeResult:

with Vertical(id="main"):
if self._definition_panel is None:
with ScrollableContainer():
with ScrollableContainer(id="main-upper", classes="main-half"):
yield self._graph_panel
with ScrollableContainer(classes="main-half"):
yield self._info_panel
else:
with ScrollableContainer(id="main-upper", classes="main-half"):
yield self._info_panel
with ScrollableContainer(classes="main-half"):
yield self._definition_panel
with ScrollableContainer(classes="main-half"):
yield self._info_panel
2 changes: 2 additions & 0 deletions rtui2/widgets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .graph_panel import RosEntityGraphPanel
from .info_panel import RosEntityInfoPanel
from .list_panel import RosEntityListPanel
from .type_definition import RosTypeDefinitionPanel

__all__ = [
"RosEntityInfoPanel",
"RosEntityListPanel",
"RosEntityGraphPanel",
"RosTypeDefinitionPanel",
]
186 changes: 186 additions & 0 deletions rtui2/widgets/graph_panel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
from __future__ import annotations

from typing import Callable

from rich.text import Text
from textual.app import ComposeResult
from textual.events import Key
from textual.widgets import Static, Tree
from textual.widgets.tree import TreeNode

from ..ros import RosClient, RosEntity, RosEntityType
from ..ros.dependency_graph import RosDependencyGraph, RosDependencyNode

EXPAND_UP_TO_NODES = 1


class TreeLabel:
@staticmethod
def label(entity: RosEntity) -> Text:
style = ""
if entity.type == RosEntityType.Node:
style = "green"
elif entity.type == RosEntityType.Topic:
style = "white"
return Text(entity.name, style=style)

@staticmethod
def leaf_label(entity: RosEntity) -> Text:
text = TreeLabel.label(entity)
text.stylize("bold underline")
return text

@staticmethod
def error(msg: str) -> Text:
return Text(f"ERROR: {msg}", style="red")

NO_PUBLISHER = Text("[No publisher]", style="yellow")

@staticmethod
def is_no_publisher(label: str | Text) -> bool:
plain = label.plain if isinstance(label, Text) else str(label)
return plain.strip() == "No publisher"


class RosEntityGraphPanel(Static):
def __init__(
self,
ros: RosClient,
entity: RosEntity | None = None,
on_highlighted_changed: Callable[[RosEntity], None] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._ros = ros
self._entity = entity
self._on_highlighted_changed = on_highlighted_changed
self._tree: Tree[RosEntity] | None = None

def compose(self) -> ComposeResult:
if self._tree:
yield self._tree

def set_entity(self, entity: RosEntity) -> None:
if entity != self._entity:
self._entity = entity
self.update_graph()

def update_graph(self) -> None:
if self._tree:
self._tree.remove()

if self._entity is None:
return

self._tree = Tree(TreeLabel.label(self._entity), data=self._entity)
self.mount(self._tree)

graph = RosDependencyGraph(
self._entity, self._ros, max_depth=EXPAND_UP_TO_NODES
)
self._populate_tree(self._tree.root, graph.root)

def _populate_tree(
self,
parent: TreeNode,
dep_node: RosDependencyNode,
depth: int = 0,
) -> None:
for child in dep_node.children:
entity = child.entity

if child.children:
label = TreeLabel.label(entity)
child_node = parent.add(label, data=entity)

if entity.type == RosEntityType.Node:
self._populate_tree(child_node, child, depth + 1)
else:
self._populate_tree(child_node, child, depth)
else:
if entity.type == RosEntityType.Topic:
topic_node = parent.add(TreeLabel.label(entity), data=entity)
topic_node.add_leaf(TreeLabel.NO_PUBLISHER)
if depth < EXPAND_UP_TO_NODES:
topic_node.expand()
else:
parent.add_leaf(TreeLabel.leaf_label(entity), data=entity)

if depth < EXPAND_UP_TO_NODES:
parent.expand()

def on_tree_node_selected(self, event: Tree.NodeSelected) -> None:
node = event.node
if self._should_update_subtree(node):
self._update_subtree(node)

def _should_update_subtree(self, node: TreeNode) -> bool:
entity = node.data
if not isinstance(entity, RosEntity):
return False
if not node.is_expanded:
return False
if node.children and all(child.is_expanded for child in node.children):
return False
if self._subtree_depth(node) > EXPAND_UP_TO_NODES:
return False

return True

def _update_subtree(self, node: TreeNode) -> None:
entity = node.data
if not isinstance(entity, RosEntity):
return

try:
graph = RosDependencyGraph(entity, self._ros, max_depth=EXPAND_UP_TO_NODES)
if graph.root.children:
node._children.clear()
node._expanded = False
self._populate_tree(node, graph.root)
except Exception as e:
node.add_leaf(TreeLabel.error(str(e)))

def _subtree_depth(self, node: TreeNode) -> int:
if not node.children:
return 0

max_depth = 0
for child in node.children:
child_entity = child.data
child_depth = self._subtree_depth(child)

if (
isinstance(child_entity, RosEntity)
and child_entity.type == RosEntityType.Node
):
child_depth += 1

max_depth = max(max_depth, child_depth)

return max_depth

async def on_key(self, event: Key) -> None:
if event.key == "space":
selected_node = self._tree.cursor_node
if selected_node is None or selected_node.parent is None:
return

siblings = selected_node.parent.children
should_expand = not selected_node.is_expanded

for sibling in siblings:
if should_expand:
sibling.expand()
else:
sibling.collapse()

self.on_tree_node_selected(Tree.NodeSelected(selected_node))

event.stop()

def on_tree_node_highlighted(self, event: Tree.NodeHighlighted) -> None:
node = event.node
entity = node.data
if isinstance(entity, RosEntity) and self._on_highlighted_changed:
self._on_highlighted_changed(entity)