v0.6.0 multi-user game support with longpoll, tests and doc updated
This commit is contained in:
@ -3,6 +3,7 @@ package http
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -295,16 +296,73 @@ func (h *HTTPHandler) GetGame(c *fiber.Ctx) error {
|
||||
})
|
||||
}
|
||||
|
||||
// Create command and execute
|
||||
cmd := processor.NewGetGameCommand(gameID)
|
||||
resp := h.proc.Execute(cmd)
|
||||
// Check for long-polling parameters
|
||||
waitStr := c.Query("wait", "false")
|
||||
moveCountStr := c.Query("moveCount", "-1")
|
||||
|
||||
// Return appropriate HTTP response
|
||||
if !resp.Success {
|
||||
return c.Status(fiber.StatusNotFound).JSON(resp.Error)
|
||||
// Non-wait path - existing behavior
|
||||
if waitStr != "true" {
|
||||
// Create command and execute
|
||||
cmd := processor.NewGetGameCommand(gameID)
|
||||
resp := h.proc.Execute(cmd)
|
||||
|
||||
// Return appropriate HTTP response
|
||||
if !resp.Success {
|
||||
return c.Status(fiber.StatusNotFound).JSON(resp.Error)
|
||||
}
|
||||
|
||||
return c.JSON(resp.Data)
|
||||
}
|
||||
|
||||
return c.JSON(resp.Data)
|
||||
// Long-polling path
|
||||
moveCount, err := strconv.Atoi(moveCountStr)
|
||||
if err != nil {
|
||||
moveCount = -1
|
||||
}
|
||||
|
||||
// First check if game exists and get current state
|
||||
g, err := h.svc.GetGame(gameID)
|
||||
if err != nil {
|
||||
return c.Status(fiber.StatusNotFound).JSON(core.ErrorResponse{
|
||||
Error: "game not found",
|
||||
Code: core.ErrGameNotFound,
|
||||
})
|
||||
}
|
||||
|
||||
currentMoveCount := len(g.Moves())
|
||||
|
||||
// If move count already different, return immediately
|
||||
if moveCount != currentMoveCount {
|
||||
cmd := processor.NewGetGameCommand(gameID)
|
||||
resp := h.proc.Execute(cmd)
|
||||
if !resp.Success {
|
||||
return c.Status(fiber.StatusNotFound).JSON(resp.Error)
|
||||
}
|
||||
return c.JSON(resp.Data)
|
||||
}
|
||||
|
||||
// Register wait with service
|
||||
ctx := c.Context()
|
||||
notify := h.svc.RegisterWait(gameID, moveCount, ctx)
|
||||
|
||||
// Wait for notification, timeout, or client disconnect
|
||||
select {
|
||||
case <-notify:
|
||||
// State changed or timeout, get fresh game state
|
||||
cmd := processor.NewGetGameCommand(gameID)
|
||||
resp := h.proc.Execute(cmd)
|
||||
|
||||
// Game might have been deleted
|
||||
if !resp.Success {
|
||||
return c.Status(fiber.StatusNotFound).JSON(resp.Error)
|
||||
}
|
||||
|
||||
return c.JSON(resp.Data)
|
||||
|
||||
case <-ctx.Done():
|
||||
// Client disconnected
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// MakeMove submits a move
|
||||
|
||||
@ -104,6 +104,9 @@ func (s *Service) ApplyMove(gameID, moveUCI, newFEN string) error {
|
||||
// Add the new position to game history
|
||||
g.AddSnapshot(newFEN, moveUCI, nextTurn)
|
||||
|
||||
// Notify waiting clients about the state change
|
||||
s.waiter.NotifyGame(gameID, len(g.Moves()))
|
||||
|
||||
// Persist if storage enabled
|
||||
if s.store != nil {
|
||||
moveNumber := len(g.Moves())
|
||||
@ -132,6 +135,12 @@ func (s *Service) UpdateGameState(gameID string, state core.State) error {
|
||||
}
|
||||
|
||||
g.SetState(state)
|
||||
|
||||
// Notify if game ended
|
||||
if state != core.StateOngoing && state != core.StatePending {
|
||||
s.waiter.NotifyGame(gameID, len(g.Moves()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -165,6 +174,9 @@ func (s *Service) UndoMoves(gameID string, count int) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Notify waiting clients about the undo
|
||||
s.waiter.NotifyGame(gameID, len(g.Moves()))
|
||||
|
||||
// Delete undone moves from storage if enabled
|
||||
if s.store != nil {
|
||||
remainingMoves := originalMoveCount - count
|
||||
@ -183,6 +195,9 @@ func (s *Service) DeleteGame(gameID string) error {
|
||||
return fmt.Errorf("game not found: %s", gameID)
|
||||
}
|
||||
|
||||
// Notify and remove all waiters before deletion
|
||||
s.waiter.RemoveGame(gameID)
|
||||
|
||||
delete(s.games, gameID)
|
||||
return nil
|
||||
}
|
||||
@ -2,7 +2,11 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"chess/internal/game"
|
||||
"chess/internal/storage"
|
||||
@ -14,15 +18,17 @@ type Service struct {
|
||||
mu sync.RWMutex
|
||||
store *storage.Store // nil if persistence disabled
|
||||
jwtSecret []byte
|
||||
waiter *WaitRegistry // Long-polling notification registry
|
||||
}
|
||||
|
||||
// New creates a new service instance with optional storage
|
||||
func New(store *storage.Store, jwtSecret []byte) (*Service, error) {
|
||||
func New(store *storage.Store, jwtSecret []byte) *Service {
|
||||
return &Service{
|
||||
games: make(map[string]*game.Game),
|
||||
store: store,
|
||||
jwtSecret: jwtSecret,
|
||||
}, nil
|
||||
waiter: NewWaitRegistry(),
|
||||
}
|
||||
}
|
||||
|
||||
// GetStorageHealth returns the storage component status
|
||||
@ -36,8 +42,21 @@ func (s *Service) GetStorageHealth() string {
|
||||
return "degraded"
|
||||
}
|
||||
|
||||
// Close cleans up resources
|
||||
func (s *Service) Close() error {
|
||||
// RegisterWait registers a client to wait for game state changes
|
||||
func (s *Service) RegisterWait(gameID string, moveCount int, ctx context.Context) <-chan struct{} {
|
||||
return s.waiter.RegisterWait(gameID, moveCount, ctx)
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the service
|
||||
func (s *Service) Shutdown(timeout time.Duration) error {
|
||||
// Collect all errors
|
||||
var errs []error
|
||||
|
||||
// Shutdown wait registry
|
||||
if err := s.waiter.Shutdown(timeout); err != nil {
|
||||
errs = append(errs, fmt.Errorf("wait registry: %w", err))
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -46,8 +65,10 @@ func (s *Service) Close() error {
|
||||
|
||||
// Close storage if enabled
|
||||
if s.store != nil {
|
||||
return s.store.Close()
|
||||
if err := s.store.Close(); err != nil {
|
||||
errs = append(errs, fmt.Errorf("storage: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
178
internal/service/waiter.go
Normal file
178
internal/service/waiter.go
Normal file
@ -0,0 +1,178 @@
|
||||
// FILE: internal/service/waiter.go
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// WaitTimeout is the maximum time a client can wait for notifications
|
||||
WaitTimeout = 25 * time.Second
|
||||
|
||||
// WaitChannelBuffer size for notification channels
|
||||
WaitChannelBuffer = 1
|
||||
)
|
||||
|
||||
// WaitRegistry manages long-polling clients waiting for game state changes
|
||||
type WaitRegistry struct {
|
||||
mu sync.RWMutex
|
||||
waiters map[string][]*WaitRequest // gameID → waiting clients
|
||||
shutdown chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// WaitRequest represents a single client waiting for game updates
|
||||
type WaitRequest struct {
|
||||
MoveCount int // Last known move count
|
||||
Notify chan struct{} // Buffered channel for notifications
|
||||
Timer *time.Timer // Timeout timer
|
||||
Context context.Context // Client connection context
|
||||
GameID string // Game being watched
|
||||
}
|
||||
|
||||
// NewWaitRegistry creates a new wait registry
|
||||
func NewWaitRegistry() *WaitRegistry {
|
||||
return &WaitRegistry{
|
||||
waiters: make(map[string][]*WaitRequest),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterWait registers a client to wait for game state changes
|
||||
func (w *WaitRegistry) RegisterWait(gameID string, moveCount int, ctx context.Context) <-chan struct{} {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// Create wait request
|
||||
req := &WaitRequest{
|
||||
MoveCount: moveCount,
|
||||
Notify: make(chan struct{}, WaitChannelBuffer),
|
||||
Context: ctx,
|
||||
GameID: gameID,
|
||||
}
|
||||
|
||||
// Setup timeout timer
|
||||
req.Timer = time.AfterFunc(WaitTimeout, func() {
|
||||
w.handleTimeout(req)
|
||||
})
|
||||
|
||||
// Add to waiters map
|
||||
w.waiters[gameID] = append(w.waiters[gameID], req)
|
||||
|
||||
// Setup cleanup on context cancellation
|
||||
w.wg.Add(1)
|
||||
go func() {
|
||||
defer w.wg.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Client disconnected
|
||||
w.removeWaiter(gameID, req)
|
||||
case <-req.Notify:
|
||||
// Notification received
|
||||
req.Timer.Stop()
|
||||
w.removeWaiter(gameID, req)
|
||||
case <-w.shutdown:
|
||||
// Server shutting down
|
||||
req.Timer.Stop()
|
||||
close(req.Notify)
|
||||
}
|
||||
}()
|
||||
|
||||
return req.Notify
|
||||
}
|
||||
|
||||
// NotifyGame notifies all clients waiting on a game about state change
|
||||
func (w *WaitRegistry) NotifyGame(gameID string, currentMoveCount int) {
|
||||
w.mu.RLock()
|
||||
waitList := w.waiters[gameID]
|
||||
w.mu.RUnlock()
|
||||
|
||||
if len(waitList) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Non-blocking notification to all waiters
|
||||
for _, req := range waitList {
|
||||
// Only notify if move count changed
|
||||
if req.MoveCount != currentMoveCount {
|
||||
select {
|
||||
case req.Notify <- struct{}{}:
|
||||
// Notification sent
|
||||
default:
|
||||
// Channel full or closed, skip slow client
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveGame removes all waiters for a game (called before game deletion)
|
||||
func (w *WaitRegistry) RemoveGame(gameID string) {
|
||||
w.mu.Lock()
|
||||
waitList := w.waiters[gameID]
|
||||
delete(w.waiters, gameID)
|
||||
w.mu.Unlock()
|
||||
|
||||
// Notify all waiters that game is gone
|
||||
for _, req := range waitList {
|
||||
select {
|
||||
case req.Notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the wait registry
|
||||
func (w *WaitRegistry) Shutdown(timeout time.Duration) error {
|
||||
close(w.shutdown)
|
||||
|
||||
// Wait for all goroutines with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
w.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-time.After(timeout):
|
||||
return fmt.Errorf("http wait registry shutdown failed")
|
||||
}
|
||||
}
|
||||
|
||||
// handleTimeout handles wait request timeout
|
||||
func (w *WaitRegistry) handleTimeout(req *WaitRequest) {
|
||||
// Send timeout notification
|
||||
select {
|
||||
case req.Notify <- struct{}{}:
|
||||
// Timeout notification sent
|
||||
default:
|
||||
// Channel full or closed
|
||||
}
|
||||
}
|
||||
|
||||
// removeWaiter removes a specific waiter from the registry
|
||||
func (w *WaitRegistry) removeWaiter(gameID string, req *WaitRequest) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
waitList := w.waiters[gameID]
|
||||
for i, waiter := range waitList {
|
||||
if waiter == req {
|
||||
// Remove from slice
|
||||
w.waiters[gameID] = append(waitList[:i], waitList[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up empty entries
|
||||
if len(w.waiters[gameID]) == 0 {
|
||||
delete(w.waiters, gameID)
|
||||
}
|
||||
|
||||
// Stop timer if still running
|
||||
req.Timer.Stop()
|
||||
}
|
||||
Reference in New Issue
Block a user