diff --git a/.serena/memories/brainstorm_the-controlplane-and-clawkerd.md b/.serena/memories/brainstorm_the-controlplane-and-clawkerd.md new file mode 100644 index 00000000..cf1b01d6 --- /dev/null +++ b/.serena/memories/brainstorm_the-controlplane-and-clawkerd.md @@ -0,0 +1,77 @@ +# Brainstorm: The Control Plane and clawkerd + +> **Status:** Active +> **Created:** 2026-02-16 +> **Last Updated:** 2026-02-16 12:00 + +## Problem / Topic +The POC (test/controlplane/) validates the two-gRPC-server pattern. The plan is to iteratively evolve this POC toward production — each iteration adds one real concern and validates it via the integration test. The master document is `clawkerd-container-control-plane` memory; this brainstorm is the working scratchpad for the current session. + + +## POC Results (from test/controlplane/) + +### What was built +- **Proto schema** (`internal/clawkerd/protocol/v1/agent.proto`): AgentReportingService (Register), AgentCommandService (RunInit) +- **clawkerd binary** (`clawkerd/main.go`): Container-side agent — starts gRPC server, registers with CP, handles RunInit (executes bash commands, streams progress, writes ready file) +- **Control plane server** (`internal/controlplane/`): server.go + registry.go — accepts Register, resolves container IP via Docker inspect, connects back to clawkerd's gRPC server, calls RunInit, consumes progress stream +- **Test Dockerfile** (`test/controlplane/testdata/Dockerfile`): Two-stage build (Go builder → Alpine), installs su-exec + tini, root entrypoint with gosu/su-exec drop +- **Test entrypoint** (`test/controlplane/testdata/entrypoint.sh`): Starts clawkerd in background, drops to claude user via su-exec +- **Integration test** (`test/controlplane/controlplane_test.go`): Full end-to-end — builds image, starts CP in-process, runs container, verifies registration, init progress, privilege separation +- **Harness extensions**: WithNetwork(), WithPortBinding(), network join on start +- **Makefile**: `make test-controlplane`, `make proto` (buf generate), excluded from unit tests + +### What was validated +1. Two-gRPC-server pattern works across Docker network (host → container via port mapping) +2. Address discovery works (clawkerd registers with listen port, CP resolves via Docker inspect + port binding) +3. Server-streaming RunInit progress flows correctly (STARTED → COMPLETED per step → READY) +4. Root entrypoint + su-exec privilege drop works (clawkerd runs as root UID 0, main process as claude UID 1001) +5. tini as PID 1 (via Dockerfile ENTRYPOINT, not HostConfig.Init in POC) manages both processes +6. Ready file signal mechanism works (/var/run/clawker/ready) +7. Init step command execution with stdout/stderr capture works + +### What was NOT validated / deferred +- HostConfig.Init (POC uses explicit tini in Dockerfile ENTRYPOINT instead) +- Graceful degradation (clawkerd falling back to baked-in defaults when CP unreachable) +- Reconnection logic (gRPC stream drops) +- Docker Events integration +- Watermill message queue +- SchedulerService (CLI → CP resource management) +- Entrypoint waiting on clawkerd ready signal (POC entrypoint is fire-and-forget) + +## Open Items / Questions +- How to handle the entrypoint wait? Current POC starts clawkerd & then immediately drops privileges. Should it wait for ready file before exec su-exec? +- Should we move to HostConfig.Init=true (Docker injects tini) or keep explicit tini in entrypoint? POC uses explicit. +- What's the plan for the `internal/controlplane/` vs `internal/clawkerd/` package split? Currently CP is in `controlplane/`, agent protocol in `clawkerd/protocol/`. Is this the right layout long-term? +- The test uses `host.docker.internal:host-gateway` for container→host communication. In production, the CP listens on clawker-net. How does address resolution change? +- Container ID mismatch: Docker hostname is 12-char truncated ID, but Docker API uses full ID. The test handles both — should clawkerd send full ID (read from /proc or cgroup)? + +## Decisions Made +- Two-gRPC-server pattern: VALIDATED by POC. CP and clawkerd each run their own gRPC server. +- su-exec over gosu: POC chose su-exec (Alpine native, ~10KB). Works. +- Root entrypoint + privilege drop: VALIDATED. Clean separation. +- Ready file at /var/run/clawker/ready: Works as signal mechanism. +- buf for protobuf generation: Configured (buf.yaml + buf.gen.yaml), `make proto` target added. +- Test harness extended with WithNetwork() and WithPortBinding() for control plane tests. + +## Conclusions / Insights +- The two-server gRPC pattern is clean and works well across Docker networking boundaries. +- Port binding (host port mapping) is needed on macOS/Docker Desktop where container IPs aren't routable from host. The CP's resolveAgentAddress() handles both port mapping and direct IP fallback. +- The POC entrypoint is minimal (3 lines) — the complexity lives in Go, not bash. This validates the "init logic in Go, not bash" principle. +- clawkerd's RunInit handles step failures gracefully (logs, sends FAILED event, continues to next step). + +## Gotchas / Risks +- Container ID truncation: Docker sets hostname to 12-char prefix. Need consistent ID handling between clawkerd and CP. +- The CP currently does Docker inspect in the Register RPC handler — this is synchronous and could slow registration if Docker is slow. +- The `go s.runInitOnAgent()` goroutine in Register has no structured lifecycle management yet (no errgroup, no cancellation tracking). +- No auth on the clawkerd→CP gRPC connection beyond the shared secret in Register. The callback connection (CP→clawkerd) has no auth at all. + +## Unknowns +- Production entrypoint behavior: should it block on clawkerd ready signal or proceed immediately? +- How will the existing hostproxy retirement timeline work? The memory says "replaced" but current codebase still has full hostproxy. +- What's the migration path from current `CreateContainer()` orchestration to CP-mediated creation? +- How does the init spec get populated in production? Currently hardcoded in test. + +## Next Steps +- Decide which production concern to tackle next in the POC iteration +- Candidates: entrypoint wait, HostConfig.Init, init spec from clawker.yaml, Docker Events, graceful degradation +- Each iteration: add the concern to test/controlplane/, validate, update master memory diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 00000000..cb3094ad --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,10 @@ +version: v2 +plugins: + - local: protoc-gen-go + out: . + opt: + - module=github.com/schmitthub/clawker + - local: protoc-gen-go-grpc + out: . + opt: + - module=github.com/schmitthub/clawker diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 00000000..4307254b --- /dev/null +++ b/buf.yaml @@ -0,0 +1,11 @@ +version: v2 +modules: + - path: internal/clawkerd/protocol +lint: + use: + - STANDARD + except: + - PACKAGE_DIRECTORY_MATCH +breaking: + use: + - FILE diff --git a/clawkerd/main.go b/clawkerd/main.go new file mode 100644 index 00000000..d5afa7cc --- /dev/null +++ b/clawkerd/main.go @@ -0,0 +1,304 @@ +// clawkerd is the container-side agent for the clawker control plane. +// +// It starts a gRPC server implementing AgentCommandService (RunInit), +// then registers with the host-side control plane. After registration, +// the control plane connects back and calls RunInit with an init spec. +// clawkerd executes the steps, streams progress, and writes a ready file +// when init is complete. +// +// Logger initialization happens AFTER registration — the control plane delivers +// ClawkerdConfiguration (identity, OTEL config, file logging config) in the +// RegisterResponse. Pre-registration failures use fmt.Fprintf to stderr. +package main + +import ( + "bytes" + "context" + "fmt" + "net" + "os" + "os/exec" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + v1 "github.com/schmitthub/clawker/internal/clawkerd/protocol/v1" + "github.com/schmitthub/clawker/internal/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + readyFilePath = "/var/run/clawker/ready" + defaultGRPCPort = "50051" + + // logsDir is the container-side log directory (Linux FHS convention for daemon logs). + // Hardcoded — clawkerd does NOT import internal/config. + logsDir = "/var/log/clawkerd" +) + +// agentServer implements AgentCommandService. +type agentServer struct { + v1.UnimplementedAgentCommandServiceServer +} + +// RunInit executes the init steps and streams progress back. +func (s *agentServer) RunInit(req *v1.RunInitRequest, stream v1.AgentCommandService_RunInitServer) error { + logger.Info().Str("component", "clawkerd").Int("step_count", len(req.Steps)).Msg("RunInit received") + + for _, step := range req.Steps { + // Send STARTED event. + if err := stream.Send(&v1.RunInitResponse{ + StepName: step.Name, + Status: v1.InitEventStatus_INIT_EVENT_STATUS_STARTED, + }); err != nil { + return fmt.Errorf("send started event for %q: %w", step.Name, err) + } + + // Execute the command. + stdout, stderr, execErr := executeCommand(stream.Context(), step.Command) + + if execErr != nil { + logger.Error(). + Str("component", "clawkerd"). + Str("step", step.Name). + Err(execErr). + Str("stderr", stderr). + Msg("init step failed") + // Send FAILED event. + if err := stream.Send(&v1.RunInitResponse{ + StepName: step.Name, + Status: v1.InitEventStatus_INIT_EVENT_STATUS_FAILED, + Output: stdout, + Error: fmt.Sprintf("%v: %s", execErr, stderr), + }); err != nil { + return fmt.Errorf("send failed event for %q: %w", step.Name, err) + } + // Continue to next step — don't abort the whole init on one failure. + continue + } + + logger.Info(). + Str("component", "clawkerd"). + Str("step", step.Name). + Str("output", strings.TrimSpace(stdout)). + Msg("init step completed") + + // Send COMPLETED event. + if err := stream.Send(&v1.RunInitResponse{ + StepName: step.Name, + Status: v1.InitEventStatus_INIT_EVENT_STATUS_COMPLETED, + Output: stdout, + }); err != nil { + return fmt.Errorf("send completed event for %q: %w", step.Name, err) + } + } + + // Write ready file before sending the READY event. + if err := writeReadyFile(); err != nil { + logger.Warn().Str("component", "clawkerd").Err(err).Msg("failed to write ready file") + } + + // Send final READY event. + if err := stream.Send(&v1.RunInitResponse{ + Status: v1.InitEventStatus_INIT_EVENT_STATUS_READY, + }); err != nil { + return fmt.Errorf("send ready event: %w", err) + } + + logger.Info().Str("component", "clawkerd").Msg("all init steps complete, ready") + return nil +} + +// executeCommand runs a bash command and captures stdout/stderr. +func executeCommand(ctx context.Context, command string) (stdout, stderr string, err error) { + cmd := exec.CommandContext(ctx, "bash", "-c", command) + var outBuf, errBuf bytes.Buffer + cmd.Stdout = &outBuf + cmd.Stderr = &errBuf + err = cmd.Run() + return outBuf.String(), errBuf.String(), err +} + +// writeReadyFile creates the ready signal file. +func writeReadyFile() error { + return os.WriteFile(readyFilePath, []byte("ready\n"), 0644) +} + +// fatalf writes a fatal error to stderr and exits. +// Used for pre-registration failures where the logger is not yet initialized. +func fatalf(format string, args ...any) { + fmt.Fprintf(os.Stderr, "[clawkerd] fatal: "+format+"\n", args...) + os.Exit(1) +} + +func main() { + // Pre-logger phase: parse env, start gRPC server, register with CP. + // Fatal errors in this phase go to stderr — logger is not initialized yet. + cpAddr := os.Getenv("CLAWKER_CONTROL_PLANE") + secret := os.Getenv("CLAWKER_CONTROL_PLANE_SECRET") + port := os.Getenv("CLAWKER_AGENT_PORT") + if port == "" { + port = defaultGRPCPort + } + + if cpAddr == "" { + fatalf("CLAWKER_CONTROL_PLANE not set") + } + if secret == "" { + fatalf("CLAWKER_CONTROL_PLANE_SECRET not set") + } + + // Start gRPC server for AgentCommandService. + listenAddr := "0.0.0.0:" + port + lis, err := net.Listen("tcp", listenAddr) + if err != nil { + fatalf("failed to listen on %s: %v", listenAddr, err) + } + + grpcServer := grpc.NewServer() + v1.RegisterAgentCommandServiceServer(grpcServer, &agentServer{}) + + // Serve in background. + go func() { + // Pre-logger — this goroutine starts before registration completes. + fmt.Fprintf(os.Stderr, "[clawkerd] gRPC server listening on %s\n", lis.Addr().String()) + if err := grpcServer.Serve(lis); err != nil { + // Post-logger may be available by now, but Serve error is fatal either way. + logger.Error().Str("component", "clawkerd").Err(err).Msg("gRPC server error") + } + }() + + // Get container ID (hostname). + containerID, err := os.Hostname() + if err != nil { + fatalf("failed to get hostname: %v", err) + } + + // Resolve the listen port from the bound address. + _, portStr, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + fatalf("failed to parse listen address: %v", err) + } + listenPort, err := strconv.ParseUint(portStr, 10, 32) + if err != nil { + fatalf("failed to parse port: %v", err) + } + + // Register with control plane — returns ClawkerdConfiguration for logger init. + resp, err := registerWithCP(cpAddr, secret, containerID, uint32(listenPort)) + if err != nil { + fatalf("failed to register with control plane at %s — is the control plane running? (clawker monitor status): %v", cpAddr, err) + } + + // --- Post-registration: initialize structured logger from CP-delivered config --- + initLogger(resp.Config) + defer logger.Close() + + logger.Info(). + Str("component", "clawkerd"). + Str("container_id", containerID). + Str("cp_addr", cpAddr). + Msg("registered with control plane, logger initialized") + + // Wait for SIGTERM/SIGINT. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT) + sig := <-sigCh + logger.Info().Str("component", "clawkerd").Str("signal", sig.String()).Msg("shutting down") + grpcServer.GracefulStop() +} + +// registerWithCP connects to the control plane and calls Register. +// Returns the full RegisterResponse so the caller can extract ClawkerdConfiguration. +func registerWithCP(cpAddr, secret, containerID string, listenPort uint32) (*v1.RegisterResponse, error) { + conn, err := grpc.NewClient( + cpAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("connect to control plane at %s: %w", cpAddr, err) + } + defer conn.Close() + + client := v1.NewAgentReportingServiceClient(conn) + resp, err := client.Register(context.Background(), &v1.RegisterRequest{ + ContainerId: containerID, + Secret: secret, + Version: "poc", + ListenPort: listenPort, + }) + if err != nil { + return nil, fmt.Errorf("register RPC: %w", err) + } + if !resp.Accepted { + return nil, fmt.Errorf("registration rejected: %s", resp.Reason) + } + + return resp, nil +} + +// initLogger initializes the structured logger from CP-delivered ClawkerdConfiguration. +// If config is nil or missing, falls back to file-only logging with defaults. +func initLogger(cfg *v1.ClawkerdConfiguration) { + if cfg == nil { + // No config from CP — initialize with defaults (file-only). + logger.NewLogger(&logger.Options{ + LogsDir: logsDir, + ServiceName: "clawker", + ScopeName: "clawkerd", + FileConfig: &logger.LoggingConfig{ + // defaults: file enabled, 50MB, 7 days, 3 backups, compress + }, + }) + return + } + + // Build file config from CP-delivered settings. + var fileConfig *logger.LoggingConfig + if fl := cfg.FileLogging; fl != nil { + enabled := fl.Enabled + compress := fl.Compress + fileConfig = &logger.LoggingConfig{ + FileEnabled: &enabled, + MaxSizeMB: int(fl.MaxSizeMb), + MaxAgeDays: int(fl.MaxAgeDays), + MaxBackups: int(fl.MaxBackups), + Compress: &compress, + } + } else { + // No file logging config — use defaults. + fileConfig = &logger.LoggingConfig{} + } + + // Build OTEL config from CP-delivered settings. + var otelConfig *logger.OtelLogConfig + if otel := cfg.Otel; otel != nil { + otelConfig = &logger.OtelLogConfig{ + Endpoint: otel.Endpoint, + Insecure: otel.Insecure, + Timeout: time.Duration(otel.TimeoutSeconds) * time.Second, + MaxQueueSize: int(otel.MaxQueueSize), + ExportInterval: time.Duration(otel.ExportIntervalSeconds) * time.Second, + } + } + + if err := logger.NewLogger(&logger.Options{ + LogsDir: logsDir, + ServiceName: "clawker", + ScopeName: "clawkerd", + FileConfig: fileConfig, + OtelConfig: otelConfig, + }); err != nil { + // Logger init failure is non-fatal — fall back to nop. + fmt.Fprintf(os.Stderr, "[clawkerd] warning: logger init failed: %v\n", err) + logger.Init() + } + + // Set project/agent context for all subsequent log entries. + if cfg.Project != "" || cfg.Agent != "" { + logger.SetContext(cfg.Project, cfg.Agent) + } +} diff --git a/docs/control-plane.md b/docs/control-plane.md new file mode 100644 index 00000000..fa8f1363 --- /dev/null +++ b/docs/control-plane.md @@ -0,0 +1,2 @@ +# Control Plane and Clawkerd + diff --git a/internal/clawkerd/protocol/v1/agent.pb.go b/internal/clawkerd/protocol/v1/agent.pb.go new file mode 100644 index 00000000..163e3193 --- /dev/null +++ b/internal/clawkerd/protocol/v1/agent.pb.go @@ -0,0 +1,761 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc (unknown) +// source: v1/agent.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// InitEventStatus tracks the lifecycle of an init step. +type InitEventStatus int32 + +const ( + InitEventStatus_INIT_EVENT_STATUS_UNSPECIFIED InitEventStatus = 0 + // STARTED indicates a step has begun execution. + InitEventStatus_INIT_EVENT_STATUS_STARTED InitEventStatus = 1 + // COMPLETED indicates a step finished successfully. + InitEventStatus_INIT_EVENT_STATUS_COMPLETED InitEventStatus = 2 + // FAILED indicates a step encountered an error. + InitEventStatus_INIT_EVENT_STATUS_FAILED InitEventStatus = 3 + // READY is the final event — all init steps are complete. + InitEventStatus_INIT_EVENT_STATUS_READY InitEventStatus = 4 +) + +// Enum value maps for InitEventStatus. +var ( + InitEventStatus_name = map[int32]string{ + 0: "INIT_EVENT_STATUS_UNSPECIFIED", + 1: "INIT_EVENT_STATUS_STARTED", + 2: "INIT_EVENT_STATUS_COMPLETED", + 3: "INIT_EVENT_STATUS_FAILED", + 4: "INIT_EVENT_STATUS_READY", + } + InitEventStatus_value = map[string]int32{ + "INIT_EVENT_STATUS_UNSPECIFIED": 0, + "INIT_EVENT_STATUS_STARTED": 1, + "INIT_EVENT_STATUS_COMPLETED": 2, + "INIT_EVENT_STATUS_FAILED": 3, + "INIT_EVENT_STATUS_READY": 4, + } +) + +func (x InitEventStatus) Enum() *InitEventStatus { + p := new(InitEventStatus) + *p = x + return p +} + +func (x InitEventStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (InitEventStatus) Descriptor() protoreflect.EnumDescriptor { + return file_v1_agent_proto_enumTypes[0].Descriptor() +} + +func (InitEventStatus) Type() protoreflect.EnumType { + return &file_v1_agent_proto_enumTypes[0] +} + +func (x InitEventStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use InitEventStatus.Descriptor instead. +func (InitEventStatus) EnumDescriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{0} +} + +// RegisterRequest is sent by clawkerd to the control plane on startup. +type RegisterRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // container_id is the Docker container ID (hostname inside container). + ContainerId string `protobuf:"bytes,1,opt,name=container_id,json=containerId,proto3" json:"container_id,omitempty"` + // secret is the shared secret for authentication. + Secret string `protobuf:"bytes,2,opt,name=secret,proto3" json:"secret,omitempty"` + // version is the clawkerd binary version. + Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` + // listen_port is the gRPC port clawkerd is listening on inside the container. + // The control plane resolves the container's IP via Docker inspect and connects + // back to : for AgentCommandService RPCs. + ListenPort uint32 `protobuf:"varint,4,opt,name=listen_port,json=listenPort,proto3" json:"listen_port,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + mi := &file_v1_agent_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{0} +} + +func (x *RegisterRequest) GetContainerId() string { + if x != nil { + return x.ContainerId + } + return "" +} + +func (x *RegisterRequest) GetSecret() string { + if x != nil { + return x.Secret + } + return "" +} + +func (x *RegisterRequest) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +func (x *RegisterRequest) GetListenPort() uint32 { + if x != nil { + return x.ListenPort + } + return 0 +} + +// RegisterResponse indicates whether registration was accepted. +// On success, includes ClawkerdConfiguration for logger initialization. +type RegisterResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // accepted is true if the agent was registered successfully. + Accepted bool `protobuf:"varint,1,opt,name=accepted,proto3" json:"accepted,omitempty"` + // reason provides context when registration is rejected. + Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"` + // config is the operational configuration for clawkerd (identity, OTEL, file logging). + // Only populated when accepted is true. + Config *ClawkerdConfiguration `protobuf:"bytes,3,opt,name=config,proto3" json:"config,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterResponse) Reset() { + *x = RegisterResponse{} + mi := &file_v1_agent_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterResponse) ProtoMessage() {} + +func (x *RegisterResponse) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. +func (*RegisterResponse) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{1} +} + +func (x *RegisterResponse) GetAccepted() bool { + if x != nil { + return x.Accepted + } + return false +} + +func (x *RegisterResponse) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +func (x *RegisterResponse) GetConfig() *ClawkerdConfiguration { + if x != nil { + return x.Config + } + return nil +} + +// ClawkerdConfiguration is the operational config delivered by the control plane +// during registration. clawkerd uses this to initialize its logger with structured +// logging, OTEL bridge, and project/agent context fields. +// Named "Clawkerd" (not "Agent") because "agent" means Claude Code instance in our domain. +type ClawkerdConfiguration struct { + state protoimpl.MessageState `protogen:"open.v1"` + // project is the project name for logger context fields. + Project string `protobuf:"bytes,1,opt,name=project,proto3" json:"project,omitempty"` + // agent is the agent name for logger context fields. + Agent string `protobuf:"bytes,2,opt,name=agent,proto3" json:"agent,omitempty"` + // otel configures the OTEL logging bridge (endpoint, timeouts, batching). + Otel *OtelLogConfig `protobuf:"bytes,3,opt,name=otel,proto3" json:"otel,omitempty"` + // file_logging configures file-based log rotation. + FileLogging *FileLogConfig `protobuf:"bytes,4,opt,name=file_logging,json=fileLogging,proto3" json:"file_logging,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ClawkerdConfiguration) Reset() { + *x = ClawkerdConfiguration{} + mi := &file_v1_agent_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ClawkerdConfiguration) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClawkerdConfiguration) ProtoMessage() {} + +func (x *ClawkerdConfiguration) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClawkerdConfiguration.ProtoReflect.Descriptor instead. +func (*ClawkerdConfiguration) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{2} +} + +func (x *ClawkerdConfiguration) GetProject() string { + if x != nil { + return x.Project + } + return "" +} + +func (x *ClawkerdConfiguration) GetAgent() string { + if x != nil { + return x.Agent + } + return "" +} + +func (x *ClawkerdConfiguration) GetOtel() *OtelLogConfig { + if x != nil { + return x.Otel + } + return nil +} + +func (x *ClawkerdConfiguration) GetFileLogging() *FileLogConfig { + if x != nil { + return x.FileLogging + } + return nil +} + +// OtelLogConfig configures the OTEL zerolog bridge for streaming logs to the collector. +type OtelLogConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // endpoint is the OTLP HTTP endpoint (host:port, no scheme). e.g. "otel-collector:4318" + Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` + // insecure disables TLS (true for local collector). + Insecure bool `protobuf:"varint,2,opt,name=insecure,proto3" json:"insecure,omitempty"` + // timeout_seconds is the export timeout (default: 5). + TimeoutSeconds int32 `protobuf:"varint,3,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + // max_queue_size is the batch processor queue size (default: 2048). + MaxQueueSize int32 `protobuf:"varint,4,opt,name=max_queue_size,json=maxQueueSize,proto3" json:"max_queue_size,omitempty"` + // export_interval_seconds is the batch export interval (default: 5). + ExportIntervalSeconds int32 `protobuf:"varint,5,opt,name=export_interval_seconds,json=exportIntervalSeconds,proto3" json:"export_interval_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *OtelLogConfig) Reset() { + *x = OtelLogConfig{} + mi := &file_v1_agent_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *OtelLogConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OtelLogConfig) ProtoMessage() {} + +func (x *OtelLogConfig) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OtelLogConfig.ProtoReflect.Descriptor instead. +func (*OtelLogConfig) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{3} +} + +func (x *OtelLogConfig) GetEndpoint() string { + if x != nil { + return x.Endpoint + } + return "" +} + +func (x *OtelLogConfig) GetInsecure() bool { + if x != nil { + return x.Insecure + } + return false +} + +func (x *OtelLogConfig) GetTimeoutSeconds() int32 { + if x != nil { + return x.TimeoutSeconds + } + return 0 +} + +func (x *OtelLogConfig) GetMaxQueueSize() int32 { + if x != nil { + return x.MaxQueueSize + } + return 0 +} + +func (x *OtelLogConfig) GetExportIntervalSeconds() int32 { + if x != nil { + return x.ExportIntervalSeconds + } + return 0 +} + +// FileLogConfig configures file-based logging with rotation. +type FileLogConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // enabled enables file logging. + Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` + // max_size_mb is the max log file size in MB before rotation (default: 50). + MaxSizeMb int32 `protobuf:"varint,2,opt,name=max_size_mb,json=maxSizeMb,proto3" json:"max_size_mb,omitempty"` + // max_age_days is the max days to retain old logs (default: 7). + MaxAgeDays int32 `protobuf:"varint,3,opt,name=max_age_days,json=maxAgeDays,proto3" json:"max_age_days,omitempty"` + // max_backups is the max number of old log files to keep (default: 3). + MaxBackups int32 `protobuf:"varint,4,opt,name=max_backups,json=maxBackups,proto3" json:"max_backups,omitempty"` + // compress enables gzip compression of rotated log files (default: true). + Compress bool `protobuf:"varint,5,opt,name=compress,proto3" json:"compress,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FileLogConfig) Reset() { + *x = FileLogConfig{} + mi := &file_v1_agent_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FileLogConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileLogConfig) ProtoMessage() {} + +func (x *FileLogConfig) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileLogConfig.ProtoReflect.Descriptor instead. +func (*FileLogConfig) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{4} +} + +func (x *FileLogConfig) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + +func (x *FileLogConfig) GetMaxSizeMb() int32 { + if x != nil { + return x.MaxSizeMb + } + return 0 +} + +func (x *FileLogConfig) GetMaxAgeDays() int32 { + if x != nil { + return x.MaxAgeDays + } + return 0 +} + +func (x *FileLogConfig) GetMaxBackups() int32 { + if x != nil { + return x.MaxBackups + } + return 0 +} + +func (x *FileLogConfig) GetCompress() bool { + if x != nil { + return x.Compress + } + return false +} + +// RunInitRequest is the declarative init specification sent by the control plane. +// clawkerd owns execution — it runs the steps and reports progress. +type RunInitRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // steps is the ordered list of init steps to execute. + Steps []*InitStep `protobuf:"bytes,1,rep,name=steps,proto3" json:"steps,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RunInitRequest) Reset() { + *x = RunInitRequest{} + mi := &file_v1_agent_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RunInitRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunInitRequest) ProtoMessage() {} + +func (x *RunInitRequest) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RunInitRequest.ProtoReflect.Descriptor instead. +func (*RunInitRequest) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{5} +} + +func (x *RunInitRequest) GetSteps() []*InitStep { + if x != nil { + return x.Steps + } + return nil +} + +// InitStep is a single init command to execute. +type InitStep struct { + state protoimpl.MessageState `protogen:"open.v1"` + // name identifies this step (e.g. "firewall", "git-config"). + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // command is the bash command to execute. + Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *InitStep) Reset() { + *x = InitStep{} + mi := &file_v1_agent_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *InitStep) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InitStep) ProtoMessage() {} + +func (x *InitStep) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InitStep.ProtoReflect.Descriptor instead. +func (*InitStep) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{6} +} + +func (x *InitStep) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *InitStep) GetCommand() string { + if x != nil { + return x.Command + } + return "" +} + +// RunInitResponse is streamed from clawkerd back to the control plane +// as each init step progresses. +type RunInitResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // step_name identifies which step this event is for. + StepName string `protobuf:"bytes,1,opt,name=step_name,json=stepName,proto3" json:"step_name,omitempty"` + // status is the current status of this step. + Status InitEventStatus `protobuf:"varint,2,opt,name=status,proto3,enum=clawker.agent.v1.InitEventStatus" json:"status,omitempty"` + // output contains captured stdout from the command. + Output string `protobuf:"bytes,3,opt,name=output,proto3" json:"output,omitempty"` + // error contains stderr or error message if the step failed. + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RunInitResponse) Reset() { + *x = RunInitResponse{} + mi := &file_v1_agent_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RunInitResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunInitResponse) ProtoMessage() {} + +func (x *RunInitResponse) ProtoReflect() protoreflect.Message { + mi := &file_v1_agent_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RunInitResponse.ProtoReflect.Descriptor instead. +func (*RunInitResponse) Descriptor() ([]byte, []int) { + return file_v1_agent_proto_rawDescGZIP(), []int{7} +} + +func (x *RunInitResponse) GetStepName() string { + if x != nil { + return x.StepName + } + return "" +} + +func (x *RunInitResponse) GetStatus() InitEventStatus { + if x != nil { + return x.Status + } + return InitEventStatus_INIT_EVENT_STATUS_UNSPECIFIED +} + +func (x *RunInitResponse) GetOutput() string { + if x != nil { + return x.Output + } + return "" +} + +func (x *RunInitResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +var File_v1_agent_proto protoreflect.FileDescriptor + +const file_v1_agent_proto_rawDesc = "" + + "\n" + + "\x0ev1/agent.proto\x12\x10clawker.agent.v1\"\x87\x01\n" + + "\x0fRegisterRequest\x12!\n" + + "\fcontainer_id\x18\x01 \x01(\tR\vcontainerId\x12\x16\n" + + "\x06secret\x18\x02 \x01(\tR\x06secret\x12\x18\n" + + "\aversion\x18\x03 \x01(\tR\aversion\x12\x1f\n" + + "\vlisten_port\x18\x04 \x01(\rR\n" + + "listenPort\"\x87\x01\n" + + "\x10RegisterResponse\x12\x1a\n" + + "\baccepted\x18\x01 \x01(\bR\baccepted\x12\x16\n" + + "\x06reason\x18\x02 \x01(\tR\x06reason\x12?\n" + + "\x06config\x18\x03 \x01(\v2'.clawker.agent.v1.ClawkerdConfigurationR\x06config\"\xc0\x01\n" + + "\x15ClawkerdConfiguration\x12\x18\n" + + "\aproject\x18\x01 \x01(\tR\aproject\x12\x14\n" + + "\x05agent\x18\x02 \x01(\tR\x05agent\x123\n" + + "\x04otel\x18\x03 \x01(\v2\x1f.clawker.agent.v1.OtelLogConfigR\x04otel\x12B\n" + + "\ffile_logging\x18\x04 \x01(\v2\x1f.clawker.agent.v1.FileLogConfigR\vfileLogging\"\xce\x01\n" + + "\rOtelLogConfig\x12\x1a\n" + + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x1a\n" + + "\binsecure\x18\x02 \x01(\bR\binsecure\x12'\n" + + "\x0ftimeout_seconds\x18\x03 \x01(\x05R\x0etimeoutSeconds\x12$\n" + + "\x0emax_queue_size\x18\x04 \x01(\x05R\fmaxQueueSize\x126\n" + + "\x17export_interval_seconds\x18\x05 \x01(\x05R\x15exportIntervalSeconds\"\xa8\x01\n" + + "\rFileLogConfig\x12\x18\n" + + "\aenabled\x18\x01 \x01(\bR\aenabled\x12\x1e\n" + + "\vmax_size_mb\x18\x02 \x01(\x05R\tmaxSizeMb\x12 \n" + + "\fmax_age_days\x18\x03 \x01(\x05R\n" + + "maxAgeDays\x12\x1f\n" + + "\vmax_backups\x18\x04 \x01(\x05R\n" + + "maxBackups\x12\x1a\n" + + "\bcompress\x18\x05 \x01(\bR\bcompress\"B\n" + + "\x0eRunInitRequest\x120\n" + + "\x05steps\x18\x01 \x03(\v2\x1a.clawker.agent.v1.InitStepR\x05steps\"8\n" + + "\bInitStep\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x18\n" + + "\acommand\x18\x02 \x01(\tR\acommand\"\x97\x01\n" + + "\x0fRunInitResponse\x12\x1b\n" + + "\tstep_name\x18\x01 \x01(\tR\bstepName\x129\n" + + "\x06status\x18\x02 \x01(\x0e2!.clawker.agent.v1.InitEventStatusR\x06status\x12\x16\n" + + "\x06output\x18\x03 \x01(\tR\x06output\x12\x14\n" + + "\x05error\x18\x04 \x01(\tR\x05error*\xaf\x01\n" + + "\x0fInitEventStatus\x12!\n" + + "\x1dINIT_EVENT_STATUS_UNSPECIFIED\x10\x00\x12\x1d\n" + + "\x19INIT_EVENT_STATUS_STARTED\x10\x01\x12\x1f\n" + + "\x1bINIT_EVENT_STATUS_COMPLETED\x10\x02\x12\x1c\n" + + "\x18INIT_EVENT_STATUS_FAILED\x10\x03\x12\x1b\n" + + "\x17INIT_EVENT_STATUS_READY\x10\x042j\n" + + "\x15AgentReportingService\x12Q\n" + + "\bRegister\x12!.clawker.agent.v1.RegisterRequest\x1a\".clawker.agent.v1.RegisterResponse2g\n" + + "\x13AgentCommandService\x12P\n" + + "\aRunInit\x12 .clawker.agent.v1.RunInitRequest\x1a!.clawker.agent.v1.RunInitResponse0\x01B=Z;github.com/schmitthub/clawker/internal/clawkerd/protocol/v1b\x06proto3" + +var ( + file_v1_agent_proto_rawDescOnce sync.Once + file_v1_agent_proto_rawDescData []byte +) + +func file_v1_agent_proto_rawDescGZIP() []byte { + file_v1_agent_proto_rawDescOnce.Do(func() { + file_v1_agent_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_v1_agent_proto_rawDesc), len(file_v1_agent_proto_rawDesc))) + }) + return file_v1_agent_proto_rawDescData +} + +var file_v1_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_v1_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_v1_agent_proto_goTypes = []any{ + (InitEventStatus)(0), // 0: clawker.agent.v1.InitEventStatus + (*RegisterRequest)(nil), // 1: clawker.agent.v1.RegisterRequest + (*RegisterResponse)(nil), // 2: clawker.agent.v1.RegisterResponse + (*ClawkerdConfiguration)(nil), // 3: clawker.agent.v1.ClawkerdConfiguration + (*OtelLogConfig)(nil), // 4: clawker.agent.v1.OtelLogConfig + (*FileLogConfig)(nil), // 5: clawker.agent.v1.FileLogConfig + (*RunInitRequest)(nil), // 6: clawker.agent.v1.RunInitRequest + (*InitStep)(nil), // 7: clawker.agent.v1.InitStep + (*RunInitResponse)(nil), // 8: clawker.agent.v1.RunInitResponse +} +var file_v1_agent_proto_depIdxs = []int32{ + 3, // 0: clawker.agent.v1.RegisterResponse.config:type_name -> clawker.agent.v1.ClawkerdConfiguration + 4, // 1: clawker.agent.v1.ClawkerdConfiguration.otel:type_name -> clawker.agent.v1.OtelLogConfig + 5, // 2: clawker.agent.v1.ClawkerdConfiguration.file_logging:type_name -> clawker.agent.v1.FileLogConfig + 7, // 3: clawker.agent.v1.RunInitRequest.steps:type_name -> clawker.agent.v1.InitStep + 0, // 4: clawker.agent.v1.RunInitResponse.status:type_name -> clawker.agent.v1.InitEventStatus + 1, // 5: clawker.agent.v1.AgentReportingService.Register:input_type -> clawker.agent.v1.RegisterRequest + 6, // 6: clawker.agent.v1.AgentCommandService.RunInit:input_type -> clawker.agent.v1.RunInitRequest + 2, // 7: clawker.agent.v1.AgentReportingService.Register:output_type -> clawker.agent.v1.RegisterResponse + 8, // 8: clawker.agent.v1.AgentCommandService.RunInit:output_type -> clawker.agent.v1.RunInitResponse + 7, // [7:9] is the sub-list for method output_type + 5, // [5:7] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_v1_agent_proto_init() } +func file_v1_agent_proto_init() { + if File_v1_agent_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_v1_agent_proto_rawDesc), len(file_v1_agent_proto_rawDesc)), + NumEnums: 1, + NumMessages: 8, + NumExtensions: 0, + NumServices: 2, + }, + GoTypes: file_v1_agent_proto_goTypes, + DependencyIndexes: file_v1_agent_proto_depIdxs, + EnumInfos: file_v1_agent_proto_enumTypes, + MessageInfos: file_v1_agent_proto_msgTypes, + }.Build() + File_v1_agent_proto = out.File + file_v1_agent_proto_goTypes = nil + file_v1_agent_proto_depIdxs = nil +} diff --git a/internal/clawkerd/protocol/v1/agent.proto b/internal/clawkerd/protocol/v1/agent.proto new file mode 100644 index 00000000..cb367600 --- /dev/null +++ b/internal/clawkerd/protocol/v1/agent.proto @@ -0,0 +1,131 @@ +syntax = "proto3"; + +package clawker.agent.v1; + +option go_package = "github.com/schmitthub/clawker/internal/clawkerd/protocol/v1"; + +// AgentReportingService — clawkerd calls these on the control plane. +// Used for registration and event reporting. +service AgentReportingService { + // Register authenticates and registers an agent with the control plane. + // clawkerd calls this on startup, providing its listen address for callbacks. + rpc Register(RegisterRequest) returns (RegisterResponse); +} + +// AgentCommandService — the control plane calls these on clawkerd. +// Each operation is its own RPC with own types (containerd shim v2 pattern). +service AgentCommandService { + // RunInit sends an init spec to the agent. The agent executes the steps + // and streams progress events back to the control plane. + rpc RunInit(RunInitRequest) returns (stream RunInitResponse); +} + +// RegisterRequest is sent by clawkerd to the control plane on startup. +message RegisterRequest { + // container_id is the Docker container ID (hostname inside container). + string container_id = 1; + // secret is the shared secret for authentication. + string secret = 2; + // version is the clawkerd binary version. + string version = 3; + // listen_port is the gRPC port clawkerd is listening on inside the container. + // The control plane resolves the container's IP via Docker inspect and connects + // back to : for AgentCommandService RPCs. + uint32 listen_port = 4; +} + +// RegisterResponse indicates whether registration was accepted. +// On success, includes ClawkerdConfiguration for logger initialization. +message RegisterResponse { + // accepted is true if the agent was registered successfully. + bool accepted = 1; + // reason provides context when registration is rejected. + string reason = 2; + // config is the operational configuration for clawkerd (identity, OTEL, file logging). + // Only populated when accepted is true. + ClawkerdConfiguration config = 3; +} + +// ClawkerdConfiguration is the operational config delivered by the control plane +// during registration. clawkerd uses this to initialize its logger with structured +// logging, OTEL bridge, and project/agent context fields. +// Named "Clawkerd" (not "Agent") because "agent" means Claude Code instance in our domain. +message ClawkerdConfiguration { + // project is the project name for logger context fields. + string project = 1; + // agent is the agent name for logger context fields. + string agent = 2; + // otel configures the OTEL logging bridge (endpoint, timeouts, batching). + OtelLogConfig otel = 3; + // file_logging configures file-based log rotation. + FileLogConfig file_logging = 4; +} + +// OtelLogConfig configures the OTEL zerolog bridge for streaming logs to the collector. +message OtelLogConfig { + // endpoint is the OTLP HTTP endpoint (host:port, no scheme). e.g. "otel-collector:4318" + string endpoint = 1; + // insecure disables TLS (true for local collector). + bool insecure = 2; + // timeout_seconds is the export timeout (default: 5). + int32 timeout_seconds = 3; + // max_queue_size is the batch processor queue size (default: 2048). + int32 max_queue_size = 4; + // export_interval_seconds is the batch export interval (default: 5). + int32 export_interval_seconds = 5; +} + +// FileLogConfig configures file-based logging with rotation. +message FileLogConfig { + // enabled enables file logging. + bool enabled = 1; + // max_size_mb is the max log file size in MB before rotation (default: 50). + int32 max_size_mb = 2; + // max_age_days is the max days to retain old logs (default: 7). + int32 max_age_days = 3; + // max_backups is the max number of old log files to keep (default: 3). + int32 max_backups = 4; + // compress enables gzip compression of rotated log files (default: true). + bool compress = 5; +} + +// RunInitRequest is the declarative init specification sent by the control plane. +// clawkerd owns execution — it runs the steps and reports progress. +message RunInitRequest { + // steps is the ordered list of init steps to execute. + repeated InitStep steps = 1; +} + +// InitStep is a single init command to execute. +message InitStep { + // name identifies this step (e.g. "firewall", "git-config"). + string name = 1; + // command is the bash command to execute. + string command = 2; +} + +// RunInitResponse is streamed from clawkerd back to the control plane +// as each init step progresses. +message RunInitResponse { + // step_name identifies which step this event is for. + string step_name = 1; + // status is the current status of this step. + InitEventStatus status = 2; + // output contains captured stdout from the command. + string output = 3; + // error contains stderr or error message if the step failed. + string error = 4; +} + +// InitEventStatus tracks the lifecycle of an init step. +enum InitEventStatus { + INIT_EVENT_STATUS_UNSPECIFIED = 0; + // STARTED indicates a step has begun execution. + INIT_EVENT_STATUS_STARTED = 1; + // COMPLETED indicates a step finished successfully. + INIT_EVENT_STATUS_COMPLETED = 2; + // FAILED indicates a step encountered an error. + INIT_EVENT_STATUS_FAILED = 3; + // READY is the final event — all init steps are complete. + INIT_EVENT_STATUS_READY = 4; +} diff --git a/internal/clawkerd/protocol/v1/agent_grpc.pb.go b/internal/clawkerd/protocol/v1/agent_grpc.pb.go new file mode 100644 index 00000000..8398eca5 --- /dev/null +++ b/internal/clawkerd/protocol/v1/agent_grpc.pb.go @@ -0,0 +1,246 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.1 +// - protoc (unknown) +// source: v1/agent.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + AgentReportingService_Register_FullMethodName = "/clawker.agent.v1.AgentReportingService/Register" +) + +// AgentReportingServiceClient is the client API for AgentReportingService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// AgentReportingService — clawkerd calls these on the control plane. +// Used for registration and event reporting. +type AgentReportingServiceClient interface { + // Register authenticates and registers an agent with the control plane. + // clawkerd calls this on startup, providing its listen address for callbacks. + Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) +} + +type agentReportingServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentReportingServiceClient(cc grpc.ClientConnInterface) AgentReportingServiceClient { + return &agentReportingServiceClient{cc} +} + +func (c *agentReportingServiceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RegisterResponse) + err := c.cc.Invoke(ctx, AgentReportingService_Register_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AgentReportingServiceServer is the server API for AgentReportingService service. +// All implementations must embed UnimplementedAgentReportingServiceServer +// for forward compatibility. +// +// AgentReportingService — clawkerd calls these on the control plane. +// Used for registration and event reporting. +type AgentReportingServiceServer interface { + // Register authenticates and registers an agent with the control plane. + // clawkerd calls this on startup, providing its listen address for callbacks. + Register(context.Context, *RegisterRequest) (*RegisterResponse, error) + mustEmbedUnimplementedAgentReportingServiceServer() +} + +// UnimplementedAgentReportingServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedAgentReportingServiceServer struct{} + +func (UnimplementedAgentReportingServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Register not implemented") +} +func (UnimplementedAgentReportingServiceServer) mustEmbedUnimplementedAgentReportingServiceServer() {} +func (UnimplementedAgentReportingServiceServer) testEmbeddedByValue() {} + +// UnsafeAgentReportingServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AgentReportingServiceServer will +// result in compilation errors. +type UnsafeAgentReportingServiceServer interface { + mustEmbedUnimplementedAgentReportingServiceServer() +} + +func RegisterAgentReportingServiceServer(s grpc.ServiceRegistrar, srv AgentReportingServiceServer) { + // If the following call panics, it indicates UnimplementedAgentReportingServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&AgentReportingService_ServiceDesc, srv) +} + +func _AgentReportingService_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentReportingServiceServer).Register(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AgentReportingService_Register_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentReportingServiceServer).Register(ctx, req.(*RegisterRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// AgentReportingService_ServiceDesc is the grpc.ServiceDesc for AgentReportingService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AgentReportingService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "clawker.agent.v1.AgentReportingService", + HandlerType: (*AgentReportingServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Register", + Handler: _AgentReportingService_Register_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "v1/agent.proto", +} + +const ( + AgentCommandService_RunInit_FullMethodName = "/clawker.agent.v1.AgentCommandService/RunInit" +) + +// AgentCommandServiceClient is the client API for AgentCommandService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// AgentCommandService — the control plane calls these on clawkerd. +// Each operation is its own RPC with own types (containerd shim v2 pattern). +type AgentCommandServiceClient interface { + // RunInit sends an init spec to the agent. The agent executes the steps + // and streams progress events back to the control plane. + RunInit(ctx context.Context, in *RunInitRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[RunInitResponse], error) +} + +type agentCommandServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentCommandServiceClient(cc grpc.ClientConnInterface) AgentCommandServiceClient { + return &agentCommandServiceClient{cc} +} + +func (c *agentCommandServiceClient) RunInit(ctx context.Context, in *RunInitRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[RunInitResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &AgentCommandService_ServiceDesc.Streams[0], AgentCommandService_RunInit_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[RunInitRequest, RunInitResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type AgentCommandService_RunInitClient = grpc.ServerStreamingClient[RunInitResponse] + +// AgentCommandServiceServer is the server API for AgentCommandService service. +// All implementations must embed UnimplementedAgentCommandServiceServer +// for forward compatibility. +// +// AgentCommandService — the control plane calls these on clawkerd. +// Each operation is its own RPC with own types (containerd shim v2 pattern). +type AgentCommandServiceServer interface { + // RunInit sends an init spec to the agent. The agent executes the steps + // and streams progress events back to the control plane. + RunInit(*RunInitRequest, grpc.ServerStreamingServer[RunInitResponse]) error + mustEmbedUnimplementedAgentCommandServiceServer() +} + +// UnimplementedAgentCommandServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedAgentCommandServiceServer struct{} + +func (UnimplementedAgentCommandServiceServer) RunInit(*RunInitRequest, grpc.ServerStreamingServer[RunInitResponse]) error { + return status.Error(codes.Unimplemented, "method RunInit not implemented") +} +func (UnimplementedAgentCommandServiceServer) mustEmbedUnimplementedAgentCommandServiceServer() {} +func (UnimplementedAgentCommandServiceServer) testEmbeddedByValue() {} + +// UnsafeAgentCommandServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AgentCommandServiceServer will +// result in compilation errors. +type UnsafeAgentCommandServiceServer interface { + mustEmbedUnimplementedAgentCommandServiceServer() +} + +func RegisterAgentCommandServiceServer(s grpc.ServiceRegistrar, srv AgentCommandServiceServer) { + // If the following call panics, it indicates UnimplementedAgentCommandServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&AgentCommandService_ServiceDesc, srv) +} + +func _AgentCommandService_RunInit_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(RunInitRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentCommandServiceServer).RunInit(m, &grpc.GenericServerStream[RunInitRequest, RunInitResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type AgentCommandService_RunInitServer = grpc.ServerStreamingServer[RunInitResponse] + +// AgentCommandService_ServiceDesc is the grpc.ServiceDesc for AgentCommandService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AgentCommandService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "clawker.agent.v1.AgentCommandService", + HandlerType: (*AgentCommandServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "RunInit", + Handler: _AgentCommandService_RunInit_Handler, + ServerStreams: true, + }, + }, + Metadata: "v1/agent.proto", +} diff --git a/internal/cmd/monitor/init/init.go b/internal/cmd/monitor/init/init.go index 266b829b..663b55c1 100644 --- a/internal/cmd/monitor/init/init.go +++ b/internal/cmd/monitor/init/init.go @@ -105,6 +105,7 @@ func initRun(_ context.Context, opts *InitOptions) error { {monitor.GrafanaDatasourcesFileName, monitor.GrafanaDatasourcesTemplate, true}, {monitor.GrafanaDashboardsFileName, monitor.GrafanaDashboardsTemplate, false}, {monitor.GrafanaDashboardFileName, monitor.GrafanaDashboardTemplate, false}, + {monitor.GrafanaCPDashboardFileName, monitor.GrafanaCPDashboardTemplate, false}, } // Write each file diff --git a/internal/controlplane/controlplanetest/mock_server.go b/internal/controlplane/controlplanetest/mock_server.go new file mode 100644 index 00000000..6e16bd0c --- /dev/null +++ b/internal/controlplane/controlplanetest/mock_server.go @@ -0,0 +1,94 @@ +package controlplanetest + +import ( + "net" + "sync" + + "github.com/schmitthub/clawker/internal/controlplane" +) + +// Compile-time interface check. +var _ controlplane.ControlPlaneService = (*MockServer)(nil) + +// MockServer is a test double for controlplane.ControlPlaneService. +// It tracks agent state in memory without gRPC, making it safe for unit +// and integration tests that need a control plane dependency. +type MockServer struct { + mu sync.RWMutex + ServeErr error // Error returned by Serve + agents map[string]*controlplane.AgentConnection +} + +// NewMockServer returns a MockServer with no registered agents. +func NewMockServer() *MockServer { + return &MockServer{ + agents: make(map[string]*controlplane.AgentConnection), + } +} + +// NewMockServerWithAgent returns a MockServer with a single pre-registered agent. +func NewMockServerWithAgent(containerID string, initCompleted bool) *MockServer { + m := NewMockServer() + m.agents[containerID] = &controlplane.AgentConnection{ + ContainerID: containerID, + InitCompleted: initCompleted, + } + return m +} + +// NewFailingMockServer returns a MockServer whose Serve returns the given error. +func NewFailingMockServer(err error) *MockServer { + m := NewMockServer() + m.ServeErr = err + return m +} + +func (m *MockServer) Serve(_ net.Listener) error { return m.ServeErr } +func (m *MockServer) Stop() {} + +func (m *MockServer) IsRegistered(containerID string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, ok := m.agents[containerID] + return ok +} + +func (m *MockServer) GetAgent(containerID string) *controlplane.AgentConnection { + m.mu.RLock() + defer m.mu.RUnlock() + agent, ok := m.agents[containerID] + if !ok { + return nil + } + snapshot := *agent + return &snapshot +} + +// --- Test helpers --- + +// RegisterAgent adds an agent entry (not yet initialized). +func (m *MockServer) RegisterAgent(containerID string) { + m.mu.Lock() + defer m.mu.Unlock() + m.agents[containerID] = &controlplane.AgentConnection{ + ContainerID: containerID, + } +} + +// CompleteInit marks the agent's init as completed. +func (m *MockServer) CompleteInit(containerID string) { + m.mu.Lock() + defer m.mu.Unlock() + if agent, ok := m.agents[containerID]; ok { + agent.InitCompleted = true + } +} + +// FailInit marks the agent's init as failed. +func (m *MockServer) FailInit(containerID string) { + m.mu.Lock() + defer m.mu.Unlock() + if agent, ok := m.agents[containerID]; ok { + agent.InitFailed = true + } +} diff --git a/internal/controlplane/registry.go b/internal/controlplane/registry.go new file mode 100644 index 00000000..68ced91a --- /dev/null +++ b/internal/controlplane/registry.go @@ -0,0 +1,114 @@ +package controlplane + +import ( + "sync" + + v1 "github.com/schmitthub/clawker/internal/clawkerd/protocol/v1" + "google.golang.org/grpc" +) + +// AgentConnection tracks a registered agent and its init state. +type AgentConnection struct { + ContainerID string + ListenPort uint32 + Version string + ClientConn *grpc.ClientConn + InitCompleted bool + InitFailed bool + InitEvents []*v1.RunInitResponse +} + +// Registry tracks connected agents by container ID. +// Thread-safe via RWMutex. +type Registry struct { + mu sync.RWMutex + agents map[string]*AgentConnection +} + +// NewRegistry creates an empty agent registry. +func NewRegistry() *Registry { + return &Registry{ + agents: make(map[string]*AgentConnection), + } +} + +// Register adds or updates an agent entry. +func (r *Registry) Register(containerID string, listenPort uint32, version string) { + r.mu.Lock() + defer r.mu.Unlock() + r.agents[containerID] = &AgentConnection{ + ContainerID: containerID, + ListenPort: listenPort, + Version: version, + } +} + +// Get returns the agent for a container ID, or nil if not found. +func (r *Registry) Get(containerID string) *AgentConnection { + r.mu.RLock() + defer r.mu.RUnlock() + agent := r.agents[containerID] + if agent == nil { + return nil + } + // Return a snapshot to avoid races on the caller's side. + snapshot := *agent + snapshot.InitEvents = make([]*v1.RunInitResponse, len(agent.InitEvents)) + copy(snapshot.InitEvents, agent.InitEvents) + return &snapshot +} + +// IsRegistered returns true if the container ID has a registered agent. +func (r *Registry) IsRegistered(containerID string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + _, ok := r.agents[containerID] + return ok +} + +// SetClientConn stores the gRPC client connection for an agent. +func (r *Registry) SetClientConn(containerID string, conn *grpc.ClientConn) { + r.mu.Lock() + defer r.mu.Unlock() + if agent, ok := r.agents[containerID]; ok { + agent.ClientConn = conn + } +} + +// AppendInitEvent records an init event for the agent. +func (r *Registry) AppendInitEvent(containerID string, event *v1.RunInitResponse) { + r.mu.Lock() + defer r.mu.Unlock() + if agent, ok := r.agents[containerID]; ok { + agent.InitEvents = append(agent.InitEvents, event) + } +} + +// SetInitCompleted marks the agent's init as completed. +func (r *Registry) SetInitCompleted(containerID string) { + r.mu.Lock() + defer r.mu.Unlock() + if agent, ok := r.agents[containerID]; ok { + agent.InitCompleted = true + } +} + +// SetInitFailed marks the agent's init as failed. +func (r *Registry) SetInitFailed(containerID string) { + r.mu.Lock() + defer r.mu.Unlock() + if agent, ok := r.agents[containerID]; ok { + agent.InitFailed = true + } +} + +// Close closes all gRPC client connections. +func (r *Registry) Close() { + r.mu.Lock() + defer r.mu.Unlock() + for _, agent := range r.agents { + if agent.ClientConn != nil { + agent.ClientConn.Close() + } + } +} diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go new file mode 100644 index 00000000..d07263f1 --- /dev/null +++ b/internal/controlplane/server.go @@ -0,0 +1,321 @@ +package controlplane + +import ( + "context" + "fmt" + "io" + "net" + "time" + + mobyclient "github.com/moby/moby/client" + v1 "github.com/schmitthub/clawker/internal/clawkerd/protocol/v1" + "github.com/schmitthub/clawker/internal/config" + "github.com/schmitthub/clawker/internal/docker" + "github.com/schmitthub/clawker/internal/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// ControlPlaneService is the consumer-facing contract for the control plane. +// CLI commands and other packages depend on this interface; tests use +// controlplanetest.MockServer. +// +//go:generate moq -rm -pkg controlplanetest -out controlplanetest/controlplane_mock.go . ControlPlaneService +type ControlPlaneService interface { + // Serve starts serving gRPC on the given listener. Blocks until Stop. + Serve(lis net.Listener) error + // Stop gracefully stops the control plane. + Stop() + // IsRegistered returns true if an agent with the given container ID has registered. + IsRegistered(containerID string) bool + // GetAgent returns agent connection info, or nil if not registered. + GetAgent(containerID string) *AgentConnection +} + +// Config holds control plane server configuration. +type Config struct { + // Secret is the shared secret agents must present to register. + Secret string + // InitSpec is the init specification to send to agents after registration. + InitSpec *v1.RunInitRequest + // DockerClient is used to inspect containers for IP resolution. + DockerClient *docker.Client + // Cfg provides access to the config store for settings resolution. + // Used to populate ClawkerdConfiguration in RegisterResponse. + Cfg config.Config + // Project is the project name for clawkerd logger context. + Project string + // Agent is the agent name for clawkerd logger context. + Agent string +} + +// Server is the control plane gRPC server. +// It implements AgentReportingService (agents register here) and on +// successful registration, connects back to the agent's AgentCommandService +// to call RunInit. +type Server struct { + v1.UnimplementedAgentReportingServiceServer + + config Config + registry *Registry + grpc *grpc.Server +} + +// NewServer creates a new control plane server. +func NewServer(cfg Config) *Server { + s := &Server{ + config: cfg, + registry: NewRegistry(), + grpc: grpc.NewServer(), + } + v1.RegisterAgentReportingServiceServer(s.grpc, s) + return s +} + +// Serve starts serving gRPC on the given listener. +// Blocks until Stop is called or the listener is closed. +func (s *Server) Serve(lis net.Listener) error { + logger.Info().Str("component", "controlplane").Str("addr", lis.Addr().String()).Msg("control plane serving") + return s.grpc.Serve(lis) +} + +// Stop gracefully stops the gRPC server and closes agent connections. +func (s *Server) Stop() { + s.grpc.GracefulStop() + s.registry.Close() +} + +// IsRegistered returns true if an agent with the given container ID has registered. +func (s *Server) IsRegistered(containerID string) bool { + return s.registry.IsRegistered(containerID) +} + +// GetAgent returns agent connection info, or nil if not registered. +func (s *Server) GetAgent(containerID string) *AgentConnection { + return s.registry.Get(containerID) +} + +// Registry returns the agent registry for direct inspection. +func (s *Server) Registry() *Registry { + return s.registry +} + +// Register implements AgentReportingService.Register. +// Validates the secret, registers the agent, resolves the container's IP +// via Docker inspect, then connects back to the agent's AgentCommandService +// to call RunInit asynchronously. +func (s *Server) Register(ctx context.Context, req *v1.RegisterRequest) (*v1.RegisterResponse, error) { + logger.Info(). + Str("component", "controlplane"). + Str("container_id", req.ContainerId). + Uint32("listen_port", req.ListenPort). + Str("version", req.Version). + Msg("agent registering") + + // Validate secret. + if req.Secret != s.config.Secret { + logger.Warn(). + Str("component", "controlplane"). + Str("container_id", req.ContainerId). + Msg("agent registration rejected: invalid secret") + return &v1.RegisterResponse{ + Accepted: false, + Reason: "invalid secret", + }, nil + } + + // Register the agent. + s.registry.Register(req.ContainerId, req.ListenPort, req.Version) + + // Resolve container IP via Docker inspect. + agentAddr, err := s.resolveAgentAddress(ctx, req.ContainerId, req.ListenPort) + if err != nil { + logger.Error().Err(err). + Str("component", "controlplane"). + Str("container_id", req.ContainerId). + Msg("failed to resolve container IP") + return &v1.RegisterResponse{ + Accepted: false, + Reason: fmt.Sprintf("failed to resolve container IP: %v", err), + }, nil + } + + logger.Info(). + Str("component", "controlplane"). + Str("container_id", req.ContainerId). + Str("agent_addr", agentAddr). + Msg("agent registered, connecting back for RunInit") + + // Connect back to the agent's gRPC server and call RunInit asynchronously. + go s.runInitOnAgent(req.ContainerId, agentAddr) + + return &v1.RegisterResponse{ + Accepted: true, + Config: s.buildClawkerdConfig(), + }, nil +} + +// buildClawkerdConfig constructs the ClawkerdConfiguration proto from resolved settings. +// Returns nil if settings are not available (graceful degradation — clawkerd falls back to defaults). +func (s *Server) buildClawkerdConfig() *v1.ClawkerdConfiguration { + cfg := &v1.ClawkerdConfiguration{ + Project: s.config.Project, + Agent: s.config.Agent, + } + + settings := s.config.Cfg.SettingsStore().Get() + + // OTEL config — uses InternalEndpoint (host:port, no scheme) for container-side collector. + if settings.Logging.Otel.Enabled != nil && *settings.Logging.Otel.Enabled { + cfg.Otel = &v1.OtelLogConfig{ + Endpoint: fmt.Sprintf("%s:%d", settings.Monitoring.OtelCollectorInternal, settings.Monitoring.OtelGRPCPort), + Insecure: true, + TimeoutSeconds: int32(settings.Logging.Otel.TimeoutSeconds), + MaxQueueSize: int32(settings.Logging.Otel.MaxQueueSize), + ExportIntervalSeconds: int32(settings.Logging.Otel.ExportIntervalSeconds), + } + } + + // File logging config. + cfg.FileLogging = &v1.FileLogConfig{ + Enabled: settings.Logging.FileEnabled != nil && *settings.Logging.FileEnabled, + MaxSizeMb: int32(settings.Logging.MaxSizeMB), + MaxAgeDays: int32(settings.Logging.MaxAgeDays), + MaxBackups: int32(settings.Logging.MaxBackups), + Compress: settings.Logging.Compress != nil && *settings.Logging.Compress, + } + + return cfg +} + +// resolveAgentAddress inspects the container to determine how to reach the +// agent's gRPC server. It first checks for a host port mapping (required on +// macOS/Docker Desktop where container IPs aren't routable from the host), +// then falls back to the container's network IP. +func (s *Server) resolveAgentAddress(ctx context.Context, containerID string, port uint32) (string, error) { + result, err := s.config.DockerClient.ContainerInspect(ctx, containerID, mobyclient.ContainerInspectOptions{}) + if err != nil { + return "", fmt.Errorf("inspect container %s: %w", containerID, err) + } + + // Check for host port mapping first (works on all platforms). + portKey := fmt.Sprintf("%d/tcp", port) + for p, bindings := range result.Container.NetworkSettings.Ports { + if p.String() == portKey && len(bindings) > 0 { + hostPort := bindings[0].HostPort + if hostPort != "" { + addr := fmt.Sprintf("127.0.0.1:%s", hostPort) + logger.Debug(). + Str("component", "controlplane"). + Str("container_id", containerID). + Str("addr", addr). + Msg("resolved agent via port mapping") + return addr, nil + } + } + } + + // Fallback: use container IP directly (works on Linux where host can reach container IPs). + for networkName, endpoint := range result.Container.NetworkSettings.Networks { + if endpoint.IPAddress.IsValid() { + logger.Debug(). + Str("component", "controlplane"). + Str("container_id", containerID). + Str("network", networkName). + Str("ip", endpoint.IPAddress.String()). + Msg("resolved container IP (direct)") + return fmt.Sprintf("%s:%d", endpoint.IPAddress, port), nil + } + } + + return "", fmt.Errorf("no valid address found for container %s", containerID) +} + +// runInitOnAgent connects to the agent's gRPC server and calls RunInit. +func (s *Server) runInitOnAgent(containerID, agentAddr string) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + // Connect to the agent's gRPC server. + conn, err := grpc.NewClient( + agentAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + logger.Error().Err(err). + Str("component", "controlplane"). + Str("container_id", containerID). + Str("agent_addr", agentAddr). + Msg("failed to connect to agent") + s.registry.SetInitFailed(containerID) + return + } + s.registry.SetClientConn(containerID, conn) + + client := v1.NewAgentCommandServiceClient(conn) + + // Send the init spec. + initSpec := s.config.InitSpec + if initSpec == nil { + initSpec = &v1.RunInitRequest{} + } + + stream, err := client.RunInit(ctx, initSpec) + if err != nil { + logger.Error().Err(err). + Str("component", "controlplane"). + Str("container_id", containerID). + Msg("RunInit RPC failed") + s.registry.SetInitFailed(containerID) + return + } + + // Consume the progress stream. + for { + event, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + logger.Error().Err(err). + Str("component", "controlplane"). + Str("container_id", containerID). + Msg("RunInit stream error") + s.registry.SetInitFailed(containerID) + return + } + + logger.Info(). + Str("component", "controlplane"). + Str("container_id", containerID). + Str("step", event.StepName). + Str("status", event.Status.String()). + Str("output", event.Output). + Msg("init event") + + s.registry.AppendInitEvent(containerID, event) + + if event.Status == v1.InitEventStatus_INIT_EVENT_STATUS_READY { + s.registry.SetInitCompleted(containerID) + logger.Info(). + Str("component", "controlplane"). + Str("container_id", containerID). + Msg("agent init completed") + return + } + + if event.Status == v1.InitEventStatus_INIT_EVENT_STATUS_FAILED { + s.registry.SetInitFailed(containerID) + logger.Error(). + Str("component", "controlplane"). + Str("container_id", containerID). + Str("step", event.StepName). + Str("error", event.Error). + Msg("agent init step failed") + return + } + } + + // Stream ended without READY event — mark as completed if we got here cleanly. + s.registry.SetInitCompleted(containerID) +} diff --git a/internal/monitor/CLAUDE.md b/internal/monitor/CLAUDE.md index 203d096e..198b78c5 100644 --- a/internal/monitor/CLAUDE.md +++ b/internal/monitor/CLAUDE.md @@ -14,6 +14,7 @@ Template source files live in `templates/`. Files with `.tmpl` extension are Go | `PrometheusTemplate` | `templates/prometheus.yaml.tmpl` | Yes | | `GrafanaDashboardsTemplate` | `templates/grafana-dashboards.yaml` | No | | `GrafanaDashboardTemplate` | `templates/grafana-dashboard.json` | No | +| `GrafanaCPDashboardTemplate` | `templates/grafana-cp-dashboard.json` | No | ## Filename Constants @@ -27,6 +28,7 @@ Output filenames for writing rendered content to disk: | `PrometheusFileName` | `prometheus.yaml` | | `GrafanaDashboardsFileName` | `grafana-dashboards.yaml` | | `GrafanaDashboardFileName` | `grafana-dashboard.json` | +| `GrafanaCPDashboardFileName` | `grafana-cp-dashboard.json` | ## Template Rendering @@ -55,6 +57,6 @@ Renders a Go `text/template` string with the given `MonitorTemplateData`. Return ## Usage -The `monitor init` command constructs `MonitorTemplateData` via `NewMonitorTemplateData`, then calls `RenderTemplate` for each `.tmpl` template before writing the rendered output to disk. Static templates (`GrafanaDashboardsTemplate`, `GrafanaDashboardTemplate`) are written directly without rendering. +The `monitor init` command constructs `MonitorTemplateData` via `NewMonitorTemplateData`, then calls `RenderTemplate` for each `.tmpl` template before writing the rendered output to disk. Static templates (`GrafanaDashboardsTemplate`, `GrafanaDashboardTemplate`, `GrafanaCPDashboardTemplate`) are written directly without rendering. All symbols are in `templates.go`. diff --git a/internal/monitor/templates.go b/internal/monitor/templates.go index 2dd79b10..a89968f6 100644 --- a/internal/monitor/templates.go +++ b/internal/monitor/templates.go @@ -31,6 +31,9 @@ var GrafanaDashboardsTemplate string //go:embed templates/grafana-dashboard.json var GrafanaDashboardTemplate string +//go:embed templates/grafana-cp-dashboard.json +var GrafanaCPDashboardTemplate string + // Template file names for writing to disk const ( ComposeFileName = "compose.yaml" @@ -39,6 +42,7 @@ const ( PrometheusFileName = "prometheus.yaml" GrafanaDashboardsFileName = "grafana-dashboards.yaml" GrafanaDashboardFileName = "grafana-dashboard.json" + GrafanaCPDashboardFileName = "grafana-cp-dashboard.json" ) // MonitorTemplateData provides values for rendering monitoring stack templates. diff --git a/internal/monitor/templates/compose.yaml.tmpl b/internal/monitor/templates/compose.yaml.tmpl index bec28b6f..0a33f1f9 100644 --- a/internal/monitor/templates/compose.yaml.tmpl +++ b/internal/monitor/templates/compose.yaml.tmpl @@ -74,6 +74,7 @@ services: - ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml - ./grafana-dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml - ./grafana-dashboard.json:/etc/grafana/provisioning/dashboards/claude-code.json + - ./grafana-cp-dashboard.json:/etc/grafana/provisioning/dashboards/clawker-controlplane.json extra_hosts: - "host.docker.internal:host-gateway" labels: diff --git a/internal/monitor/templates/grafana-cp-dashboard.json b/internal/monitor/templates/grafana-cp-dashboard.json new file mode 100644 index 00000000..0e331994 --- /dev/null +++ b/internal/monitor/templates/grafana-cp-dashboard.json @@ -0,0 +1,199 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 1, + "panels": [], + "title": "Control Plane Logs", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${lokidatasource}" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 2, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": true, + "sortOrder": "Descending", + "wrapLogMessage": false + }, + "targets": [ + { + "expr": "{service_name=\"clawker\"} | scope_name = `clawker` | project=~`^(${project:regex})$` | agent=~`^(${agent:regex})$`", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Control Plane Logs", + "type": "logs" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 13 + }, + "id": 3, + "panels": [], + "title": "clawkerd Logs", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${lokidatasource}" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 14 + }, + "id": 4, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": true, + "sortOrder": "Descending", + "wrapLogMessage": false + }, + "targets": [ + { + "expr": "{service_name=\"clawker\"} | scope_name = `clawkerd` | project=~`^(${project:regex})$` | agent=~`^(${agent:regex})$`", + "legendFormat": "", + "refId": "A" + } + ], + "title": "clawkerd Logs", + "type": "logs" + } + ], + "refresh": "10s", + "schemaVersion": 38, + "style": "dark", + "tags": [ + "clawker", + "controlplane", + "monitoring" + ], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Loki", + "value": "Loki" + }, + "hide": 0, + "includeAll": false, + "label": "Loki Datasource", + "multi": false, + "name": "lokidatasource", + "options": [], + "query": "loki", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "loki", + "uid": "${lokidatasource}" + }, + "definition": "label_values({service_name=\"clawker\"}, project)", + "hide": 0, + "includeAll": true, + "label": "Project", + "multi": true, + "name": "project", + "options": [], + "query": { + "query": "label_values({service_name=\"clawker\"}, project)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "loki", + "uid": "${lokidatasource}" + }, + "definition": "label_values({service_name=\"clawker\"} | project=~`^(${project:regex})$`, agent)", + "hide": 0, + "includeAll": true, + "label": "Agent", + "multi": true, + "name": "agent", + "options": [], + "query": { + "query": "label_values({service_name=\"clawker\"} | project=~`^(${project:regex})$`, agent)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Clawker Control Plane", + "uid": "clawker-controlplane", + "version": 1, + "weekStart": "" +}