Skip to content

RFC: Extended Interface Definitions #96

@freol35241

Description

@freol35241

RFC: Extended Interface Definitions

  • Status: Draft
  • Created: 2025-01-08
  • Authors: RISE Maritime

Summary

This RFC proposes extending Keelson's interface definitions to support mixed communication patterns that combine RPC (queryables) with streaming (pubsub). This enables documenting complex workflows like remote control sessions, route execution, and sensor calibration in a single, self-describing protobuf service definition.

Motivation

Current Keelson interfaces only support pure RPC patterns:

service Configurable {
  rpc get_config(Empty) returns (JSON);
  rpc set_config(JSON) returns (SuccessResponse);
}

However, many real maritime workflows require mixed patterns:

  • Request → Stream of responses: Start route execution, receive continuous progress updates
  • Command → Status updates → Completion: Sensor calibration with feedback
  • Bidirectional session: Remote control with continuous command/feedback loops

Currently, these patterns are implemented ad-hoc without formal specification, making them difficult to document, validate, and generate tooling for.

Proposal

Use Protobuf Stream Syntax with Custom Options

Leverage protobuf's native stream keyword as a semantic marker, combined with custom options that bind streams to existing Keelson subjects.

Custom Options Definition

// keelson/options.proto
syntax = "proto3";

import "google/protobuf/descriptor.proto";

package keelson;

extend google.protobuf.MethodOptions {
  StreamBinding stream_binding = 51000;
}

extend google.protobuf.ServiceOptions {
  repeated string states = 51001;
}

message StreamBinding {
  // Reference to well-known subject from subjects.yaml
  // Payload type is defined there, validated against proto stream type
  string request_subject = 1;
  string response_subject = 2;
  
  // Field name in messages that carries session correlation ID
  string session_field = 3;
}

Example Interface

// interfaces/RouteExecution.proto
syntax = "proto3";

import "keelson/options.proto";
import "payloads/RouteExecution.proto";

package keelson.interfaces;

service RouteExecution {
  // State machine declaration
  option (keelson.states) = "IDLE";
  option (keelson.states) = "EXECUTING";
  option (keelson.states) = "PAUSED";
  option (keelson.states) = "COMPLETED";
  option (keelson.states) = "CANCELLED";

  // RPC returns ack, then streams progress via pubsub
  rpc Start(RouteExecutionRequest) returns (stream keelson.RouteProgress) {
    option (keelson.stream_binding) = {
      response_subject: "route_execution_progress"
      session_field: "session_id"
    };
  }

  // Bidirectional: commands via pubsub, status via pubsub
  rpc Execute(stream keelson.RouteCommand) returns (stream keelson.RouteStatus) {
    option (keelson.stream_binding) = {
      request_subject: "route_execution_command"
      response_subject: "route_execution_status"
      session_field: "session_id"
    };
  }

  // Pure RPC (unchanged from current behavior)
  rpc Cancel(CancelRequest) returns (CancelAck);
}

Reuse Existing Subject Definitions

Stream bindings reference subjects defined in subjects.yaml. The payload type is defined there, not duplicated:

# subjects.yaml
route_execution_progress:   keelson.RouteProgress
route_execution_command:    keelson.RouteCommand
route_execution_status:     keelson.RouteStatus
route_execution_result:     keelson.RouteResult

Payload messages live in messages/payloads/ as usual:

// messages/payloads/RouteExecution.proto
syntax = "proto3";

import "google/protobuf/timestamp.proto";

package keelson;

message RouteProgress {
  google.protobuf.Timestamp timestamp = 1;
  string session_id = 2;
  int32 current_waypoint_index = 3;
  float progress_pct = 4;
  float cross_track_error_m = 5;
}

message RouteCommand {
  google.protobuf.Timestamp timestamp = 1;
  string session_id = 2;
  oneof command {
    float adjust_speed_knots = 3;
    int32 skip_to_waypoint = 4;
    bool pause = 5;
  }
}

message RouteStatus {
  google.protobuf.Timestamp timestamp = 1;
  string session_id = 2;
  
  enum State {
    IDLE = 0;
    EXECUTING = 1;
    PAUSED = 2;
    COMPLETED = 3;
    CANCELLED = 4;
  }
  State state = 3;
}

Mapping to Zenoh

Proto Declaration Zenoh Implementation
rpc X(A) returns (B) Queryable at @rpc/x/{source_id}
rpc X(A) returns (stream B) Queryable returns initial response; subsequent messages published to pubsub/{response_subject}/{source_id}
rpc X(stream A) returns (B) Client publishes to pubsub/{request_subject}/{source_id}; final result via queryable response or pubsub
rpc X(stream A) returns (stream B) Both directions via pubsub, correlated by session_field

Wire Format

Stream messages use standard Keelson pubsub wire format. The interface definition documents what flows, not how it's wrapped. Transport concerns (Envelope, attachments, etc.) remain orthogonal SDK/runtime decisions.

Key:     rise/v0/vessel/pubsub/route_execution_progress/autopilot/0
Payload: <transport-wrapped RouteProgress>

Backwards Compatibility

Pure RPC interfaces remain unchanged:

service Configurable {
  rpc get_config(Empty) returns (JSON);
  rpc set_config(JSON) returns (SuccessResponse);
}

No custom options required. Existing interfaces work without modification.

Validation

Tooling can validate consistency between proto definitions and subjects.yaml:

def validate_interface(proto_service, subjects_yaml):
    for method in proto_service.methods:
        binding = get_stream_binding(method)
        if not binding:
            continue
            
        if binding.response_subject:
            expected = subjects_yaml[binding.response_subject]
            actual = method.output_type
            assert types_match(expected, actual), \
                f"{binding.response_subject}: expected {expected}, got {actual}"
        
        if binding.request_subject:
            expected = subjects_yaml[binding.request_subject]
            actual = method.input_type
            assert types_match(expected, actual), \
                f"{binding.request_subject}: expected {expected}, got {actual}"

Documentation Generation

From annotated proto files, generate:

  1. Sequence diagrams showing message flow
  2. State machine diagrams from states option
  3. Key-space documentation from subject bindings
  4. SDK stubs with correct pubsub/RPC wiring

Example generated documentation:

┌────────────────────────────────────────────────────────────────────────┐
│  INTERFACE: RouteExecution                                             │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  STATES: IDLE → EXECUTING ⇄ PAUSED → {COMPLETED, CANCELLED}           │
│                                                                        │
│  Client                              Service                           │
│    │                                    │                              │
│    │─── @rpc/start ────────────────────>│                              │
│    │<── RouteExecutionAck ──────────────│                              │
│    │                                    │                              │
│    │<··· pubsub/route_execution_progress│  (continuous)                │
│    │<··· pubsub/route_execution_progress│                              │
│    │                                    │                              │
│    │···> pubsub/route_execution_command │  (on demand)                 │
│    │                                    │                              │
│    │<··· pubsub/route_execution_status ·│                              │
│    │                                    │                              │
│    │─── @rpc/cancel ───────────────────>│                              │
│    │<── CancelAck ──────────────────────│                              │
│                                                                        │
│  Legend: ─── RPC    ··· pubsub                                         │
└────────────────────────────────────────────────────────────────────────┘

File Structure

keelson/
├── messages/
│   ├── Envelope.proto
│   ├── subjects.yaml
│   └── payloads/
│       ├── Primitives.proto
│       ├── RouteExecution.proto      # Payload messages
│       └── ...
├── interfaces/
│   ├── README.md
│   ├── Configurable.proto            # Pure RPC (unchanged)
│   ├── NetworkPingPong.proto         # Pure RPC (unchanged)
│   ├── RouteExecution.proto          # Mixed pattern (new)
│   └── RemoteControl.proto           # Mixed pattern (new)
└── keelson/
    └── options.proto                 # Custom options definition

Design Principles

  1. Single source of truth: Proto file defines the interface completely
  2. Reuse existing semantics: Subjects and payloads defined once, referenced by interfaces
  3. Explicit types: Stream payload types visible in proto, not hidden behind Envelope
  4. Transport-agnostic: Wire format decisions orthogonal to interface definition
  5. Incremental adoption: Existing pure-RPC interfaces unchanged
  6. Tooling-friendly: Standard protoc compiles it; custom options enable generation

Open Questions

  1. Session ID in key-space vs payload?

    • Payload: More flexible, works with existing infrastructure
    • Key-space: Enables Zenoh-level filtering (.../{session_id}/...)
  2. Formal state machine syntax?

    • Current: Simple string list of states
    • Alternative: Explicit transitions (IDLE->EXECUTING, etc.)
  3. Timeout/lifetime semantics?

    • Should interfaces declare expected stream frequencies or timeouts?

References

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions