diff --git a/go.mod b/go.mod index 11ff277..de1def1 100644 --- a/go.mod +++ b/go.mod @@ -27,10 +27,12 @@ require ( filippo.io/edwards25519 v1.2.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/fasthttp/websocket v1.5.3 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-sql-driver/mysql v1.9.3 // indirect + github.com/gofiber/websocket/v2 v2.2.1 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect @@ -49,6 +51,7 @@ require ( github.com/mfridman/interpolate v0.0.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.69.0 // indirect diff --git a/go.sum b/go.sum index b376099..3a942bd 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.12 h1:e9hWvmLYvtp846tLHam2o++qitpguFiYCKbn0w9jyqw= github.com/gabriel-vasile/mimetype v1.4.12/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= @@ -48,6 +50,8 @@ github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI6 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofiber/fiber/v2 v2.52.12 h1:0LdToKclcPOj8PktUdIKo9BUohjjwfnQl42Dhw8/WUw= github.com/gofiber/fiber/v2 v2.52.12/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= @@ -144,6 +148,8 @@ github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE= github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/infrastructure/collaboration/handler.go b/infrastructure/collaboration/handler.go new file mode 100644 index 0000000..df2d292 --- /dev/null +++ b/infrastructure/collaboration/handler.go @@ -0,0 +1,101 @@ +package collaboration + +import ( + "strings" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" + "github.com/labbs/nexo/application/session" + "github.com/labbs/nexo/application/session/dto" + "github.com/rs/zerolog" +) + +const wsCollabPrefix = "/ws/collab/" + +// Handler manages WebSocket connections for Y.js collaboration. +type Handler struct { + hub *Hub + sessionApp *session.SessionApplication + logger zerolog.Logger +} + +// NewHandler creates a new collaboration WebSocket handler. +func NewHandler(hub *Hub, sessionApp *session.SessionApplication, logger zerolog.Logger) *Handler { + return &Handler{ + hub: hub, + sessionApp: sessionApp, + logger: logger.With().Str("component", "collaboration.handler").Logger(), + } +} + +// UpgradeMiddleware checks for WebSocket upgrade and validates the JWT token +// before upgrading the connection. +func (h *Handler) UpgradeMiddleware() fiber.Handler { + return func(c *fiber.Ctx) error { + if !websocket.IsWebSocketUpgrade(c) { + return fiber.ErrUpgradeRequired + } + + h.logger.Debug().Str("event", "ws_upgrade").Str("path", c.Path()).Msg("upgrading to WebSocket") + + token := c.Query("token") + if token == "" { + return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "missing token"}) + } + + result, err := h.sessionApp.ValidateToken(dto.ValidateTokenInput{Token: token}) + if err != nil { + h.logger.Warn().Err(err).Msg("invalid token on websocket upgrade") + return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "invalid token"}) + } + + // Store auth context and path in locals for the WebSocket handler + c.Locals("user_id", result.AuthContext.UserID) + c.Locals("path", c.Path()) + + return c.Next() + } +} + +// WebSocketHandler returns the Fiber WebSocket handler for collaboration. +func (h *Handler) WebSocketHandler() fiber.Handler { + return websocket.New(func(c *websocket.Conn) { + // Extract room ID from path: /ws/collab/ + roomID := strings.TrimPrefix(c.Locals("path").(string), wsCollabPrefix) + userID, _ := c.Locals("user_id").(string) + + h.logger.Debug().Str("event", "ws_connection").Str("room_id", roomID).Str("user_id", userID).Msg("new WebSocket connection") + + if roomID == "" { + h.logger.Warn().Msg("empty room id") + return + } + + room := h.hub.GetOrCreateRoom(roomID) + client := &Client{ + UserID: userID, + } + + room.AddClient(c, client) + defer func() { + room.RemoveClient(c) + h.hub.RemoveRoomIfEmpty(roomID) + }() + + // Read loop: relay all binary messages to other clients in the room + for { + messageType, msg, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + h.logger.Warn().Err(err).Str("room_id", roomID).Str("user_id", userID).Msg("unexpected close") + } + break + } + + // Only relay binary messages (Y.js protocol) + if messageType == websocket.BinaryMessage { + room.Broadcast(c, msg) + } + } + }) +} diff --git a/infrastructure/collaboration/hub.go b/infrastructure/collaboration/hub.go new file mode 100644 index 0000000..1d76172 --- /dev/null +++ b/infrastructure/collaboration/hub.go @@ -0,0 +1,73 @@ +package collaboration + +import ( + "sync" + + "github.com/rs/zerolog" +) + +// Hub manages all collaboration rooms. +type Hub struct { + mu sync.RWMutex + rooms map[string]*Room + logger zerolog.Logger +} + +// NewHub creates a new collaboration hub. +func NewHub(logger zerolog.Logger) *Hub { + return &Hub{ + rooms: make(map[string]*Room), + logger: logger.With().Str("component", "collaboration.hub").Logger(), + } +} + +// GetOrCreateRoom returns an existing room or creates a new one. +func (h *Hub) GetOrCreateRoom(roomID string) *Room { + h.mu.RLock() + room, ok := h.rooms[roomID] + h.mu.RUnlock() + if ok { + return room + } + + h.mu.Lock() + defer h.mu.Unlock() + + // Double-check after acquiring write lock + if room, ok = h.rooms[roomID]; ok { + return room + } + + room = newRoom(roomID, h.logger) + h.rooms[roomID] = room + h.logger.Info().Str("room_id", roomID).Msg("room created") + return room +} + +// RemoveRoomIfEmpty removes a room if it has no more clients. +func (h *Hub) RemoveRoomIfEmpty(roomID string) { + h.mu.Lock() + defer h.mu.Unlock() + + room, ok := h.rooms[roomID] + if !ok { + return + } + + if room.ClientCount() == 0 { + delete(h.rooms, roomID) + h.logger.Info().Str("room_id", roomID).Msg("room removed (empty)") + } +} + +// Stats returns the number of active rooms and total clients. +func (h *Hub) Stats() (rooms int, clients int) { + h.mu.RLock() + defer h.mu.RUnlock() + + rooms = len(h.rooms) + for _, r := range h.rooms { + clients += r.ClientCount() + } + return +} diff --git a/infrastructure/collaboration/room.go b/infrastructure/collaboration/room.go new file mode 100644 index 0000000..786f30c --- /dev/null +++ b/infrastructure/collaboration/room.go @@ -0,0 +1,88 @@ +package collaboration + +import ( + "sync" + + "github.com/gofiber/websocket/v2" + "github.com/rs/zerolog" +) + +// Room represents a Y.js collaboration room. +// It acts as a pure relay: binary messages from one client are broadcast to all others. +type Room struct { + id string + mu sync.RWMutex + clients map[*websocket.Conn]*Client + logger zerolog.Logger +} + +// Client holds metadata about a connected user. +type Client struct { + UserID string + Username string + writeMu sync.Mutex +} + +func newRoom(id string, logger zerolog.Logger) *Room { + return &Room{ + id: id, + clients: make(map[*websocket.Conn]*Client), + logger: logger.With().Str("component", "collaboration.room").Str("room_id", id).Logger(), + } +} + +// AddClient registers a new WebSocket connection in the room. +func (r *Room) AddClient(conn *websocket.Conn, client *Client) { + r.mu.Lock() + defer r.mu.Unlock() + r.clients[conn] = client + r.logger.Info().Str("user_id", client.UserID).Int("clients", len(r.clients)).Msg("client joined") +} + +// RemoveClient unregisters a WebSocket connection from the room. +func (r *Room) RemoveClient(conn *websocket.Conn) { + r.mu.Lock() + client, ok := r.clients[conn] + if ok { + delete(r.clients, conn) + } + count := len(r.clients) + r.mu.Unlock() + + if ok { + r.logger.Info().Str("user_id", client.UserID).Int("clients", count).Msg("client left") + } +} + +// Broadcast sends a binary message to all clients except the sender. +func (r *Room) Broadcast(sender *websocket.Conn, msg []byte) { + // Snapshot targets under read lock to avoid holding the lock during IO. + r.mu.RLock() + type target struct { + conn *websocket.Conn + client *Client + } + targets := make([]target, 0, len(r.clients)) + for conn, client := range r.clients { + if conn != sender { + targets = append(targets, target{conn, client}) + } + } + r.mu.RUnlock() + + for _, t := range targets { + t.client.writeMu.Lock() + err := t.conn.WriteMessage(websocket.BinaryMessage, msg) + t.client.writeMu.Unlock() + if err != nil { + r.logger.Warn().Err(err).Msg("failed to write to client") + } + } +} + +// ClientCount returns the number of connected clients. +func (r *Room) ClientCount() int { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.clients) +} diff --git a/infrastructure/deps.go b/infrastructure/deps.go index fd32ef0..905fa00 100644 --- a/infrastructure/deps.go +++ b/infrastructure/deps.go @@ -15,6 +15,7 @@ import ( "github.com/labbs/nexo/application/user" "github.com/labbs/nexo/application/webhook" "github.com/labbs/nexo/domain" + "github.com/labbs/nexo/infrastructure/collaboration" "github.com/labbs/nexo/infrastructure/config" "github.com/labbs/nexo/infrastructure/cronscheduler" "github.com/labbs/nexo/infrastructure/database" @@ -43,4 +44,6 @@ type Deps struct { FavoriteApplication *favorite.FavoriteApplication PermissionApplication *permission.PermissionApplication PermissionPers domain.PermissionPers + + CollaborationHub *collaboration.Hub } diff --git a/infrastructure/static/static.go b/infrastructure/static/static.go index 743eed9..15753a3 100644 --- a/infrastructure/static/static.go +++ b/infrastructure/static/static.go @@ -36,6 +36,11 @@ func NewStatic(f *fiber.App) { return c.Next() } + // Skip WebSocket routes + if strings.HasPrefix(path, "/ws") { + return c.Next() + } + // Serve index.html from the embedded FS for all other routes (SPA routes) indexFile, err := embedDirStatic.ReadFile("files/index.html") if err != nil { diff --git a/interfaces/cli/server/server.go b/interfaces/cli/server/server.go index b05a288..d9e38da 100644 --- a/interfaces/cli/server/server.go +++ b/interfaces/cli/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/labbs/nexo/application/user" "github.com/labbs/nexo/application/webhook" "github.com/labbs/nexo/infrastructure" + "github.com/labbs/nexo/infrastructure/collaboration" "github.com/labbs/nexo/infrastructure/config" "github.com/labbs/nexo/infrastructure/cronscheduler" "github.com/labbs/nexo/infrastructure/database" @@ -145,6 +146,9 @@ func runServer(cfg config.Config) error { deps.SessionApplication.DatabaseApplication = deps.DatabaseApplication deps.SessionApplication.DrawingApplication = deps.DrawingApplication + // Initialize collaboration hub + deps.CollaborationHub = collaboration.NewHub(deps.Logger) + // Initialize HTTP server (fiber + fiberoapi) deps.Http, err = http.Configure(deps.Config, deps.Logger, deps.SessionApplication, true) if err != nil { diff --git a/interfaces/http/router.go b/interfaces/http/router.go index 695d25d..591cd83 100644 --- a/interfaces/http/router.go +++ b/interfaces/http/router.go @@ -2,6 +2,7 @@ package http import ( "github.com/labbs/nexo/infrastructure" + "github.com/labbs/nexo/infrastructure/collaboration" v1 "github.com/labbs/nexo/interfaces/http/v1" ) @@ -14,4 +15,21 @@ func SetupRoutes(deps infrastructure.Deps) { // Setup v1 routes v1.SetupRouterV1(deps) + + // Setup WebSocket collaboration route + setupCollaborationRoutes(deps) +} + +func setupCollaborationRoutes(deps infrastructure.Deps) { + logger := deps.Logger.With().Str("component", "http.router.collaboration").Logger() + logger.Info().Str("event", "setup_collaboration_routes").Msg("Setting up collaboration WebSocket routes") + handler := collaboration.NewHandler(deps.CollaborationHub, deps.SessionApplication, deps.Logger) + + // The frontend connects to ws:///?token= + // Room formats: "document:" or "row::" + // Use("/ws/collab") is a prefix match, Get uses wildcard for the room ID (contains colons) + deps.Http.Fiber.Use("/ws/collab", handler.UpgradeMiddleware()) + deps.Http.Fiber.Get("/ws/collab/+", handler.WebSocketHandler()) + + logger.Debug().Interface("paths", deps.Http.Fiber.GetRoutes()).Msg("Registered HTTP routes") }