diff --git a/README.md b/README.md index ea0c889..3015abe 100644 --- a/README.md +++ b/README.md @@ -64,17 +64,17 @@ LogStream Service ## Configuration -Configuration file location: `~/.config/logwisp.toml` +Default configuration file location: `~/.config/logwisp.toml` ### Basic Multi-Stream Configuration ```toml -# Application logs stream +# Application logs transport [[streams]] name = "app" [streams.monitor] -# Per-stream check interval in milliseconds +# Per-transport check interval in milliseconds check_interval_ms = 100 targets = [ { path = "/var/log/myapp", pattern = "*.log", is_file = false }, @@ -116,7 +116,7 @@ response_code = 429 response_message = "Rate limit exceeded" max_connections_per_ip = 5 -# System logs stream with slower check interval +# System logs transport with slower check interval [[streams]] name = "system" @@ -288,10 +288,10 @@ All HTTP streams share ports with path-based routing: ### HTTP/SSE Stream ```bash -# Connect to a stream +# Connect to a transport curl -N http://localhost:8080/stream -# Check stream status (includes filter and rate limit stats) +# Check transport status (includes filter and rate limit stats) curl http://localhost:8080/status # With authentication (when implemented) @@ -742,6 +742,8 @@ Contributions are welcome! Please read our contributing guidelines and submit pu - [x] Regex-based log filtering - [ ] Log transformation (field extraction, formatting) - [ ] Configurable logging/stdout support +- [ ] Service/non-interactive setup +- [ ] Live config change support - [ ] Authentication (Basic, JWT, mTLS) - [ ] TLS/SSL support - [ ] Prometheus metrics export diff --git a/doc/architecture.md b/doc/architecture.md index 3ca4a1f..6dc9c40 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -30,18 +30,18 @@ logwisp/ ├── filter/ │ ├── filter.go # Regex-based log filtering implementation │ └── chain.go # Sequential filter chain management - ├── logstream/ - │ ├── httprouter.go # HTTP router for path-based routing - │ ├── logstream.go # Stream lifecycle management - │ ├── routerserver.go # Router server implementation - │ └── service.go # Multi-stream service orchestration ├── monitor/ │ ├── file_watcher.go # File watching and rotation detection │ └── monitor.go # Log monitoring interface and implementation ├── ratelimit/ │ ├── ratelimit.go # Token bucket algorithm implementation │ └── limiter.go # Per-stream rate limiter with IP tracking - ├── stream/ + ├── service/ + │ ├── httprouter.go # HTTP router for path-based routing + │ ├── logstream.go # Stream lifecycle management + │ ├── routerserver.go # Router server implementation + │ └── service.go # Multi-stream service orchestration + ├── transport/ │ ├── httpstreamer.go # HTTP/SSE streaming with rate limiting │ ├── noop_logger.go # Silent logger for gnet │ ├── tcpserver.go # TCP server with rate limiting (gnet) @@ -206,7 +206,7 @@ Client Request → Rate Limiter → Token Bucket Check → Allow/Deny name = "stream-name" [streams.monitor] -check_interval_ms = 100 # Per-stream check interval +check_interval_ms = 100 # Per-transport check interval targets = [ { path = "/path/to/logs", pattern = "*.log", is_file = false }, { path = "/path/to/file.log", is_file = true } diff --git a/src/cmd/logwisp/main.go b/src/cmd/logwisp/main.go index 0244563..3d86991 100644 --- a/src/cmd/logwisp/main.go +++ b/src/cmd/logwisp/main.go @@ -11,7 +11,7 @@ import ( "time" "logwisp/src/internal/config" - "logwisp/src/internal/logstream" + "logwisp/src/internal/service" "logwisp/src/internal/version" ) @@ -49,41 +49,41 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - // Create log stream service - service := logstream.New(ctx) + // Create log transport service + svc := service.New(ctx) // Create HTTP router if requested - var router *logstream.HTTPRouter + var router *service.HTTPRouter if *useRouter { - router = logstream.NewHTTPRouter(service) + router = service.NewHTTPRouter(svc) fmt.Println("HTTP router mode enabled") } // Initialize streams successCount := 0 for _, streamCfg := range cfg.Streams { - fmt.Printf("Initializing stream '%s'...\n", streamCfg.Name) + fmt.Printf("Initializing transport '%s'...\n", streamCfg.Name) - // Set router mode BEFORE creating stream + // Set router mode BEFORE creating transport if *useRouter && streamCfg.HTTPServer != nil && streamCfg.HTTPServer.Enabled { // Temporarily disable standalone server startup originalEnabled := streamCfg.HTTPServer.Enabled streamCfg.HTTPServer.Enabled = false - if err := service.CreateStream(streamCfg); err != nil { - fmt.Fprintf(os.Stderr, "Failed to create stream '%s': %v\n", streamCfg.Name, err) + if err := svc.CreateStream(streamCfg); err != nil { + fmt.Fprintf(os.Stderr, "Failed to create transport '%s': %v\n", streamCfg.Name, err) continue } - // Get the created stream and configure for router mode - stream, _ := service.GetStream(streamCfg.Name) + // Get the created transport and configure for router mode + stream, _ := svc.GetStream(streamCfg.Name) if stream.HTTPServer != nil { stream.HTTPServer.SetRouterMode() // Restore enabled state stream.Config.HTTPServer.Enabled = originalEnabled if err := router.RegisterStream(stream); err != nil { - fmt.Fprintf(os.Stderr, "Failed to register stream '%s' with router: %v\n", + fmt.Fprintf(os.Stderr, "Failed to register transport '%s' with router: %v\n", streamCfg.Name, err) } else { fmt.Printf("Stream '%s' registered with router\n", streamCfg.Name) @@ -91,8 +91,8 @@ func main() { } } else { // Standard standalone mode - if err := service.CreateStream(streamCfg); err != nil { - fmt.Fprintf(os.Stderr, "Failed to create stream '%s': %v\n", streamCfg.Name, err) + if err := svc.CreateStream(streamCfg); err != nil { + fmt.Fprintf(os.Stderr, "Failed to create transport '%s': %v\n", streamCfg.Name, err) continue } } @@ -109,10 +109,10 @@ func main() { } fmt.Printf("LogWisp %s\n", version.Short()) - fmt.Printf("\n%d stream(s) running. Press Ctrl+C to stop.\n", successCount) + fmt.Printf("\n%d transport(s) running. Press Ctrl+C to stop.\n", successCount) // Start periodic status display - go statusReporter(service) + go statusReporter(svc) // Wait for shutdown <-sigChan @@ -130,7 +130,7 @@ func main() { done := make(chan struct{}) go func() { - service.Shutdown() + svc.Shutdown() close(done) }() @@ -150,11 +150,11 @@ func displayStreamEndpoints(cfg config.StreamConfig, routerMode bool) { if cfg.HTTPServer != nil && cfg.HTTPServer.Enabled { if routerMode { - fmt.Printf(" HTTP: /%s%s (stream), /%s%s (status)\n", + fmt.Printf(" HTTP: /%s%s (transport), /%s%s (status)\n", cfg.Name, cfg.HTTPServer.StreamPath, cfg.Name, cfg.HTTPServer.StatusPath) } else { - fmt.Printf(" HTTP: http://localhost:%d%s (stream), http://localhost:%d%s (status)\n", + fmt.Printf(" HTTP: http://localhost:%d%s (transport), http://localhost:%d%s (status)\n", cfg.HTTPServer.Port, cfg.HTTPServer.StreamPath, cfg.HTTPServer.Port, cfg.HTTPServer.StatusPath) } @@ -165,7 +165,7 @@ func displayStreamEndpoints(cfg config.StreamConfig, routerMode bool) { } } -func statusReporter(service *logstream.Service) { +func statusReporter(service *service.Service) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() diff --git a/src/internal/config/loader.go b/src/internal/config/loader.go index 38ed974..582439f 100644 --- a/src/internal/config/loader.go +++ b/src/internal/config/loader.go @@ -25,7 +25,7 @@ func defaults() *Config { Enabled: true, Port: 8080, BufferSize: 1000, - StreamPath: "/stream", + StreamPath: "/transport", StatusPath: "/status", Heartbeat: HeartbeatConfig{ Enabled: true, diff --git a/src/internal/config/stream.go b/src/internal/config/stream.go index 39ac4fb..5efa354 100644 --- a/src/internal/config/stream.go +++ b/src/internal/config/stream.go @@ -1,13 +1,15 @@ -// FILE: src/internal/config/stream.go +// FILE: src/internal/config/transport.go package config -import "logwisp/src/internal/filter" +import ( + "logwisp/src/internal/filter" +) type StreamConfig struct { // Stream identifier (used in logs and metrics) Name string `toml:"name"` - // Monitor configuration for this stream + // Monitor configuration for this transport Monitor *StreamMonitorConfig `toml:"monitor"` // Filter configuration diff --git a/src/internal/config/validation.go b/src/internal/config/validation.go index 9f8c78b..91ecc21 100644 --- a/src/internal/config/validation.go +++ b/src/internal/config/validation.go @@ -3,9 +3,10 @@ package config import ( "fmt" - "logwisp/src/internal/filter" "regexp" "strings" + + "logwisp/src/internal/filter" ) func (c *Config) validate() error { @@ -13,38 +14,38 @@ func (c *Config) validate() error { return fmt.Errorf("no streams configured") } - // Validate each stream + // Validate each transport streamNames := make(map[string]bool) streamPorts := make(map[int]string) for i, stream := range c.Streams { if stream.Name == "" { - return fmt.Errorf("stream %d: missing name", i) + return fmt.Errorf("transport %d: missing name", i) } if streamNames[stream.Name] { - return fmt.Errorf("stream %d: duplicate name '%s'", i, stream.Name) + return fmt.Errorf("transport %d: duplicate name '%s'", i, stream.Name) } streamNames[stream.Name] = true // Stream must have monitor config with targets if stream.Monitor == nil || len(stream.Monitor.Targets) == 0 { - return fmt.Errorf("stream '%s': no monitor targets specified", stream.Name) + return fmt.Errorf("transport '%s': no monitor targets specified", stream.Name) } // Validate check interval if stream.Monitor.CheckIntervalMs < 10 { - return fmt.Errorf("stream '%s': check interval too small: %d ms (min: 10ms)", + return fmt.Errorf("transport '%s': check interval too small: %d ms (min: 10ms)", stream.Name, stream.Monitor.CheckIntervalMs) } // Validate targets for j, target := range stream.Monitor.Targets { if target.Path == "" { - return fmt.Errorf("stream '%s' target %d: empty path", stream.Name, j) + return fmt.Errorf("transport '%s' target %d: empty path", stream.Name, j) } if strings.Contains(target.Path, "..") { - return fmt.Errorf("stream '%s' target %d: path contains directory traversal", stream.Name, j) + return fmt.Errorf("transport '%s' target %d: path contains directory traversal", stream.Name, j) } } @@ -58,16 +59,16 @@ func (c *Config) validate() error { // Validate TCP server if stream.TCPServer != nil && stream.TCPServer.Enabled { if stream.TCPServer.Port < 1 || stream.TCPServer.Port > 65535 { - return fmt.Errorf("stream '%s': invalid TCP port: %d", stream.Name, stream.TCPServer.Port) + return fmt.Errorf("transport '%s': invalid TCP port: %d", stream.Name, stream.TCPServer.Port) } if existing, exists := streamPorts[stream.TCPServer.Port]; exists { - return fmt.Errorf("stream '%s': TCP port %d already used by stream '%s'", + return fmt.Errorf("transport '%s': TCP port %d already used by transport '%s'", stream.Name, stream.TCPServer.Port, existing) } streamPorts[stream.TCPServer.Port] = stream.Name + "-tcp" if stream.TCPServer.BufferSize < 1 { - return fmt.Errorf("stream '%s': TCP buffer size must be positive: %d", + return fmt.Errorf("transport '%s': TCP buffer size must be positive: %d", stream.Name, stream.TCPServer.BufferSize) } @@ -87,32 +88,32 @@ func (c *Config) validate() error { // Validate HTTP server if stream.HTTPServer != nil && stream.HTTPServer.Enabled { if stream.HTTPServer.Port < 1 || stream.HTTPServer.Port > 65535 { - return fmt.Errorf("stream '%s': invalid HTTP port: %d", stream.Name, stream.HTTPServer.Port) + return fmt.Errorf("transport '%s': invalid HTTP port: %d", stream.Name, stream.HTTPServer.Port) } if existing, exists := streamPorts[stream.HTTPServer.Port]; exists { - return fmt.Errorf("stream '%s': HTTP port %d already used by stream '%s'", + return fmt.Errorf("transport '%s': HTTP port %d already used by transport '%s'", stream.Name, stream.HTTPServer.Port, existing) } streamPorts[stream.HTTPServer.Port] = stream.Name + "-http" if stream.HTTPServer.BufferSize < 1 { - return fmt.Errorf("stream '%s': HTTP buffer size must be positive: %d", + return fmt.Errorf("transport '%s': HTTP buffer size must be positive: %d", stream.Name, stream.HTTPServer.BufferSize) } // Validate paths if stream.HTTPServer.StreamPath == "" { - stream.HTTPServer.StreamPath = "/stream" + stream.HTTPServer.StreamPath = "/transport" } if stream.HTTPServer.StatusPath == "" { stream.HTTPServer.StatusPath = "/status" } if !strings.HasPrefix(stream.HTTPServer.StreamPath, "/") { - return fmt.Errorf("stream '%s': stream path must start with /: %s", + return fmt.Errorf("transport '%s': transport path must start with /: %s", stream.Name, stream.HTTPServer.StreamPath) } if !strings.HasPrefix(stream.HTTPServer.StatusPath, "/") { - return fmt.Errorf("stream '%s': status path must start with /: %s", + return fmt.Errorf("transport '%s': status path must start with /: %s", stream.Name, stream.HTTPServer.StatusPath) } @@ -133,7 +134,7 @@ func (c *Config) validate() error { tcpEnabled := stream.TCPServer != nil && stream.TCPServer.Enabled httpEnabled := stream.HTTPServer != nil && stream.HTTPServer.Enabled if !tcpEnabled && !httpEnabled { - return fmt.Errorf("stream '%s': no servers enabled", stream.Name) + return fmt.Errorf("transport '%s': no servers enabled", stream.Name) } // Validate auth if present @@ -148,11 +149,11 @@ func (c *Config) validate() error { func validateHeartbeat(serverType, streamName string, hb *HeartbeatConfig) error { if hb.Enabled { if hb.IntervalSeconds < 1 { - return fmt.Errorf("stream '%s' %s: heartbeat interval must be positive: %d", + return fmt.Errorf("transport '%s' %s: heartbeat interval must be positive: %d", streamName, serverType, hb.IntervalSeconds) } if hb.Format != "json" && hb.Format != "comment" { - return fmt.Errorf("stream '%s' %s: heartbeat format must be 'json' or 'comment': %s", + return fmt.Errorf("transport '%s' %s: heartbeat format must be 'json' or 'comment': %s", streamName, serverType, hb.Format) } } @@ -162,23 +163,23 @@ func validateHeartbeat(serverType, streamName string, hb *HeartbeatConfig) error func validateSSL(serverType, streamName string, ssl *SSLConfig) error { if ssl != nil && ssl.Enabled { if ssl.CertFile == "" || ssl.KeyFile == "" { - return fmt.Errorf("stream '%s' %s: SSL enabled but cert/key files not specified", + return fmt.Errorf("transport '%s' %s: SSL enabled but cert/key files not specified", streamName, serverType) } if ssl.ClientAuth && ssl.ClientCAFile == "" { - return fmt.Errorf("stream '%s' %s: client auth enabled but CA file not specified", + return fmt.Errorf("transport '%s' %s: client auth enabled but CA file not specified", streamName, serverType) } // Validate TLS versions validVersions := map[string]bool{"TLS1.0": true, "TLS1.1": true, "TLS1.2": true, "TLS1.3": true} if ssl.MinVersion != "" && !validVersions[ssl.MinVersion] { - return fmt.Errorf("stream '%s' %s: invalid min TLS version: %s", + return fmt.Errorf("transport '%s' %s: invalid min TLS version: %s", streamName, serverType, ssl.MinVersion) } if ssl.MaxVersion != "" && !validVersions[ssl.MaxVersion] { - return fmt.Errorf("stream '%s' %s: invalid max TLS version: %s", + return fmt.Errorf("transport '%s' %s: invalid max TLS version: %s", streamName, serverType, ssl.MaxVersion) } } @@ -192,15 +193,15 @@ func validateAuth(streamName string, auth *AuthConfig) error { validTypes := map[string]bool{"none": true, "basic": true, "bearer": true, "mtls": true} if !validTypes[auth.Type] { - return fmt.Errorf("stream '%s': invalid auth type: %s", streamName, auth.Type) + return fmt.Errorf("transport '%s': invalid auth type: %s", streamName, auth.Type) } if auth.Type == "basic" && auth.BasicAuth == nil { - return fmt.Errorf("stream '%s': basic auth type specified but config missing", streamName) + return fmt.Errorf("transport '%s': basic auth type specified but config missing", streamName) } if auth.Type == "bearer" && auth.BearerAuth == nil { - return fmt.Errorf("stream '%s': bearer auth type specified but config missing", streamName) + return fmt.Errorf("transport '%s': bearer auth type specified but config missing", streamName) } return nil @@ -212,26 +213,33 @@ func validateRateLimit(serverType, streamName string, rl *RateLimitConfig) error } if rl.RequestsPerSecond <= 0 { - return fmt.Errorf("stream '%s' %s: requests_per_second must be positive: %f", + return fmt.Errorf("transport '%s' %s: requests_per_second must be positive: %f", streamName, serverType, rl.RequestsPerSecond) } if rl.BurstSize < 1 { - return fmt.Errorf("stream '%s' %s: burst_size must be at least 1: %d", + return fmt.Errorf("transport '%s' %s: burst_size must be at least 1: %d", streamName, serverType, rl.BurstSize) } validLimitBy := map[string]bool{"ip": true, "global": true, "": true} if !validLimitBy[rl.LimitBy] { - return fmt.Errorf("stream '%s' %s: invalid limit_by value: %s (must be 'ip' or 'global')", + return fmt.Errorf("transport '%s' %s: invalid limit_by value: %s (must be 'ip' or 'global')", streamName, serverType, rl.LimitBy) } if rl.ResponseCode > 0 && (rl.ResponseCode < 400 || rl.ResponseCode >= 600) { - return fmt.Errorf("stream '%s' %s: response_code must be 4xx or 5xx: %d", + return fmt.Errorf("transport '%s' %s: response_code must be 4xx or 5xx: %d", streamName, serverType, rl.ResponseCode) } + if rl.MaxConnectionsPerIP > 0 && rl.MaxTotalConnections > 0 { + if rl.MaxConnectionsPerIP > rl.MaxTotalConnections { + return fmt.Errorf("stream '%s' %s: max_connections_per_ip (%d) cannot exceed max_total_connections (%d)", + streamName, serverType, rl.MaxConnectionsPerIP, rl.MaxTotalConnections) + } + } + return nil } @@ -241,7 +249,7 @@ func validateFilter(streamName string, filterIndex int, cfg *filter.Config) erro case filter.TypeInclude, filter.TypeExclude, "": // Valid types default: - return fmt.Errorf("stream '%s' filter[%d]: invalid type '%s' (must be 'include' or 'exclude')", + return fmt.Errorf("transport '%s' filter[%d]: invalid type '%s' (must be 'include' or 'exclude')", streamName, filterIndex, cfg.Type) } @@ -250,7 +258,7 @@ func validateFilter(streamName string, filterIndex int, cfg *filter.Config) erro case filter.LogicOr, filter.LogicAnd, "": // Valid logic default: - return fmt.Errorf("stream '%s' filter[%d]: invalid logic '%s' (must be 'or' or 'and')", + return fmt.Errorf("transport '%s' filter[%d]: invalid logic '%s' (must be 'or' or 'and')", streamName, filterIndex, cfg.Logic) } @@ -262,7 +270,7 @@ func validateFilter(streamName string, filterIndex int, cfg *filter.Config) erro // Validate regex patterns for i, pattern := range cfg.Patterns { if _, err := regexp.Compile(pattern); err != nil { - return fmt.Errorf("stream '%s' filter[%d] pattern[%d] '%s': invalid regex: %w", + return fmt.Errorf("transport '%s' filter[%d] pattern[%d] '%s': invalid regex: %w", streamName, filterIndex, i, pattern, err) } } diff --git a/src/internal/monitor/file_watcher.go b/src/internal/monitor/file_watcher.go index 9831c70..4ca13b6 100644 --- a/src/internal/monitor/file_watcher.go +++ b/src/internal/monitor/file_watcher.go @@ -217,17 +217,23 @@ func (w *fileWatcher) checkFile() error { } // Update position after successful read - if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil { - w.mu.Lock() - w.position = currentPos - w.size = currentSize - w.modTime = currentModTime - if !rotated && currentInode != 0 { - w.inode = currentInode - } - w.mu.Unlock() + currentPos, err := file.Seek(0, io.SeekCurrent) + if err != nil { + // Log error but don't fail - position tracking is best effort + fmt.Printf("[WARN] Failed to get file position for %s: %v\n", w.path, err) + // Use size as fallback position + currentPos = currentSize } + w.mu.Lock() + w.position = currentPos + w.size = currentSize + w.modTime = currentModTime + if !rotated && currentInode != 0 { + w.inode = currentInode + } + w.mu.Unlock() + return scanner.Err() } diff --git a/src/internal/ratelimit/limiter.go b/src/internal/ratelimit/limiter.go index 168fe5f..0a331ab 100644 --- a/src/internal/ratelimit/limiter.go +++ b/src/internal/ratelimit/limiter.go @@ -11,7 +11,7 @@ import ( "logwisp/src/internal/config" ) -// Manages rate limiting for a stream +// Manages rate limiting for a transport type Limiter struct { config config.RateLimitConfig @@ -19,7 +19,7 @@ type Limiter struct { ipLimiters map[string]*ipLimiter ipMu sync.RWMutex - // Global limiter for the stream + // Global limiter for the transport globalLimiter *TokenBucket // Connection tracking diff --git a/src/internal/logstream/httprouter.go b/src/internal/service/httprouter.go similarity index 91% rename from src/internal/logstream/httprouter.go rename to src/internal/service/httprouter.go index 252e0b3..79bbb0d 100644 --- a/src/internal/logstream/httprouter.go +++ b/src/internal/service/httprouter.go @@ -1,5 +1,5 @@ -// FILE: src/internal/logstream/httprouter.go -package logstream +// FILE: src/internal/service/httprouter.go +package service import ( "fmt" @@ -71,23 +71,23 @@ func (r *HTTPRouter) RegisterStream(stream *LogStream) error { } r.mu.Unlock() - // Register routes for this stream + // Register routes for this transport rs.routeMu.Lock() defer rs.routeMu.Unlock() - // Use stream name as path prefix + // Use transport name as path prefix pathPrefix := "/" + stream.Name // Check for conflicts for existingPath, existingStream := range rs.routes { if strings.HasPrefix(pathPrefix, existingPath) || strings.HasPrefix(existingPath, pathPrefix) { - return fmt.Errorf("path conflict: '%s' conflicts with existing stream '%s' at '%s'", + return fmt.Errorf("path conflict: '%s' conflicts with existing transport '%s' at '%s'", pathPrefix, existingStream.Name, existingPath) } } rs.routes[pathPrefix] = stream - fmt.Printf("[ROUTER] Registered stream '%s' at path '%s' on port %d\n", stream.Name, pathPrefix, port) + fmt.Printf("[ROUTER] Registered transport '%s' at path '%s' on port %d\n", stream.Name, pathPrefix, port) return nil } @@ -100,7 +100,7 @@ func (r *HTTPRouter) UnregisterStream(streamName string) { for path, stream := range rs.routes { if stream.Name == streamName { delete(rs.routes, path) - fmt.Printf("[ROUTER] Unregistered stream '%s' from path '%s' on port %d\n", + fmt.Printf("[ROUTER] Unregistered transport '%s' from path '%s' on port %d\n", streamName, path, port) } } diff --git a/src/internal/logstream/logstream.go b/src/internal/service/logstream.go similarity index 63% rename from src/internal/logstream/logstream.go rename to src/internal/service/logstream.go index c6f7e42..d22f7c0 100644 --- a/src/internal/logstream/logstream.go +++ b/src/internal/service/logstream.go @@ -1,15 +1,42 @@ -// FILE: src/internal/logstream/logstream.go -package logstream +// FILE: src/internal/service/logstream.go +package service import ( "context" "fmt" + "path/filepath" "sync" "time" "logwisp/src/internal/config" + "logwisp/src/internal/filter" + "logwisp/src/internal/monitor" + "logwisp/src/internal/transport" ) +type LogStream struct { + Name string + Config config.StreamConfig + Monitor monitor.Monitor + FilterChain *filter.Chain + TCPServer *transport.TCPStreamer + HTTPServer *transport.HTTPStreamer + Stats *StreamStats + + ctx context.Context + cancel context.CancelFunc +} + +type StreamStats struct { + StartTime time.Time + MonitorStats monitor.Stats + TCPConnections int32 + HTTPConnections int32 + TotalBytesServed uint64 + TotalEntriesServed uint64 + FilterStats map[string]any +} + func (ls *LogStream) Shutdown() { // Stop servers first var wg sync.WaitGroup @@ -79,18 +106,38 @@ func (ls *LogStream) GetStats() map[string]any { } func (ls *LogStream) UpdateTargets(targets []config.MonitorTarget) error { - // Clear existing targets - for _, watcher := range ls.Monitor.GetActiveWatchers() { - ls.Monitor.RemoveTarget(watcher.Path) + // Validate new targets first + validatedTargets := make([]config.MonitorTarget, 0, len(targets)) + for _, target := range targets { + // Basic validation + absPath, err := filepath.Abs(target.Path) + if err != nil { + return fmt.Errorf("invalid target path %s: %w", target.Path, err) + } + target.Path = absPath + validatedTargets = append(validatedTargets, target) } + // Get current watchers + oldWatchers := ls.Monitor.GetActiveWatchers() + // Add new targets - for _, target := range targets { + for _, target := range validatedTargets { if err := ls.Monitor.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil { - return err + // Rollback: restore old watchers + for _, watcher := range oldWatchers { + // Best effort restoration + ls.Monitor.AddTarget(watcher.Path, "", false) + } + return fmt.Errorf("failed to add target %s: %w", target.Path, err) } } + // Only remove old targets after new ones are successfully added + for _, watcher := range oldWatchers { + ls.Monitor.RemoveTarget(watcher.Path) + } + return nil } diff --git a/src/internal/logstream/routerserver.go b/src/internal/service/routerserver.go similarity index 84% rename from src/internal/logstream/routerserver.go rename to src/internal/service/routerserver.go index 1ca36c3..c49d6e2 100644 --- a/src/internal/logstream/routerserver.go +++ b/src/internal/service/routerserver.go @@ -1,5 +1,5 @@ -// FILE: src/internal/config/routerserver.go -package logstream +// FILE: src/internal/service/routerserver.go +package service import ( "encoding/json" @@ -16,7 +16,7 @@ import ( type routerServer struct { port int server *fasthttp.Server - routes map[string]*LogStream // path prefix -> stream + routes map[string]*LogStream // path prefix -> transport routeMu sync.RWMutex router *HTTPRouter startTime time.Time @@ -38,7 +38,7 @@ func (rs *routerServer) requestHandler(ctx *fasthttp.RequestCtx) { return } - // Find matching stream + // Find matching transport rs.routeMu.RLock() var matchedStream *LogStream var matchedPrefix string @@ -68,18 +68,18 @@ func (rs *routerServer) requestHandler(ctx *fasthttp.RequestCtx) { rs.router.routedRequests.Add(1) - // Route to stream's handler + // Route to transport's handler if matchedStream.HTTPServer != nil { // Save original path originalPath := string(ctx.URI().Path()) - // Rewrite path to remove stream prefix + // Rewrite path to remove transport prefix if remainingPath == "" { - // Default to stream path if no remaining path + // Default to transport path if no remaining path remainingPath = matchedStream.Config.HTTPServer.StreamPath } - fmt.Printf("[ROUTER] Routing to stream '%s': %s -> %s\n", + fmt.Printf("[ROUTER] Routing to transport '%s': %s -> %s\n", matchedStream.Name, originalPath, remainingPath) ctx.URI().SetPath(remainingPath) @@ -91,8 +91,8 @@ func (rs *routerServer) requestHandler(ctx *fasthttp.RequestCtx) { ctx.SetStatusCode(fasthttp.StatusServiceUnavailable) ctx.SetContentType("application/json") json.NewEncoder(ctx).Encode(map[string]string{ - "error": "Stream HTTP server not available", - "stream": matchedStream.Name, + "error": "Stream HTTP server not available", + "transport": matchedStream.Name, }) } } @@ -109,8 +109,8 @@ func (rs *routerServer) handleGlobalStatus(ctx *fasthttp.RequestCtx) { streamStats["routing"] = map[string]any{ "path_prefix": prefix, "endpoints": map[string]string{ - "stream": prefix + stream.Config.HTTPServer.StreamPath, - "status": prefix + stream.Config.HTTPServer.StatusPath, + "transport": prefix + stream.Config.HTTPServer.StreamPath, + "status": prefix + stream.Config.HTTPServer.StatusPath, }, } @@ -148,7 +148,7 @@ func (rs *routerServer) handleNotFound(ctx *fasthttp.RequestCtx) { for prefix, stream := range rs.routes { if stream.Config.HTTPServer != nil { availableRoutes = append(availableRoutes, - fmt.Sprintf("%s%s (stream: %s)", prefix, stream.Config.HTTPServer.StreamPath, stream.Name), + fmt.Sprintf("%s%s (transport: %s)", prefix, stream.Config.HTTPServer.StreamPath, stream.Name), fmt.Sprintf("%s%s (status: %s)", prefix, stream.Config.HTTPServer.StatusPath, stream.Name), ) } diff --git a/src/internal/logstream/service.go b/src/internal/service/service.go similarity index 84% rename from src/internal/logstream/service.go rename to src/internal/service/service.go index 69b57af..ac2c3ef 100644 --- a/src/internal/logstream/service.go +++ b/src/internal/service/service.go @@ -1,16 +1,16 @@ -// FILE: src/internal/logstream/service.go -package logstream +// FILE: src/internal/service/service.go +package service import ( "context" "fmt" - "logwisp/src/internal/filter" "sync" "time" "logwisp/src/internal/config" + "logwisp/src/internal/filter" "logwisp/src/internal/monitor" - "logwisp/src/internal/stream" + "logwisp/src/internal/transport" ) type Service struct { @@ -21,29 +21,6 @@ type Service struct { wg sync.WaitGroup } -type LogStream struct { - Name string - Config config.StreamConfig - Monitor monitor.Monitor - FilterChain *filter.Chain - TCPServer *stream.TCPStreamer - HTTPServer *stream.HTTPStreamer - Stats *StreamStats - - ctx context.Context - cancel context.CancelFunc -} - -type StreamStats struct { - StartTime time.Time - MonitorStats monitor.Stats - TCPConnections int32 - HTTPConnections int32 - TotalBytesServed uint64 - TotalEntriesServed uint64 - FilterStats map[string]any -} - func New(ctx context.Context) *Service { serviceCtx, cancel := context.WithCancel(ctx) return &Service{ @@ -58,10 +35,10 @@ func (s *Service) CreateStream(cfg config.StreamConfig) error { defer s.mu.Unlock() if _, exists := s.streams[cfg.Name]; exists { - return fmt.Errorf("stream '%s' already exists", cfg.Name) + return fmt.Errorf("transport '%s' already exists", cfg.Name) } - // Create stream context + // Create transport context streamCtx, streamCancel := context.WithCancel(s.ctx) // Create monitor @@ -93,7 +70,7 @@ func (s *Service) CreateStream(cfg config.StreamConfig) error { filterChain = chain } - // Create log stream + // Create log transport ls := &LogStream{ Name: cfg.Name, Config: cfg, @@ -120,7 +97,7 @@ func (s *Service) CreateStream(cfg config.StreamConfig) error { s.filterLoop(streamCtx, rawChan, tcpChan, filterChain) }() - ls.TCPServer = stream.NewTCPStreamer(tcpChan, *cfg.TCPServer) + ls.TCPServer = transport.NewTCPStreamer(tcpChan, *cfg.TCPServer) if err := s.startTCPServer(ls); err != nil { ls.Shutdown() @@ -142,7 +119,7 @@ func (s *Service) CreateStream(cfg config.StreamConfig) error { s.filterLoop(streamCtx, rawChan, httpChan, filterChain) }() - ls.HTTPServer = stream.NewHTTPStreamer(httpChan, *cfg.HTTPServer) + ls.HTTPServer = transport.NewHTTPStreamer(httpChan, *cfg.HTTPServer) if err := s.startHTTPServer(ls); err != nil { ls.Shutdown() @@ -187,7 +164,7 @@ func (s *Service) GetStream(name string) (*LogStream, error) { stream, exists := s.streams[name] if !exists { - return nil, fmt.Errorf("stream '%s' not found", name) + return nil, fmt.Errorf("transport '%s' not found", name) } return stream, nil } @@ -209,7 +186,7 @@ func (s *Service) RemoveStream(name string) error { stream, exists := s.streams[name] if !exists { - return fmt.Errorf("stream '%s' not found", name) + return fmt.Errorf("transport '%s' not found", name) } stream.Shutdown() diff --git a/src/internal/stream/httpstreamer.go b/src/internal/transport/httpstreamer.go similarity index 95% rename from src/internal/stream/httpstreamer.go rename to src/internal/transport/httpstreamer.go index 9e43852..7e4e47f 100644 --- a/src/internal/stream/httpstreamer.go +++ b/src/internal/transport/httpstreamer.go @@ -1,5 +1,5 @@ -// FILE: src/internal/stream/httpstreamer.go -package stream +// FILE: src/internal/transport/httpstreamer.go +package transport import ( "bufio" @@ -43,7 +43,7 @@ func NewHTTPStreamer(logChan chan monitor.LogEntry, cfg config.HTTPConfig) *HTTP // Set default paths if not configured streamPath := cfg.StreamPath if streamPath == "" { - streamPath = "/stream" + streamPath = "/transport" } statusPath := cfg.StatusPath if statusPath == "" { @@ -151,7 +151,7 @@ func (h *HTTPStreamer) requestHandler(ctx *fasthttp.RequestCtx) { ctx.SetContentType("application/json") json.NewEncoder(ctx).Encode(map[string]any{ "error": "Not Found", - "message": fmt.Sprintf("Available endpoints: %s (SSE stream), %s (status)", + "message": fmt.Sprintf("Available endpoints: %s (SSE transport), %s (status)", h.streamPath, h.statusPath), }) } @@ -166,7 +166,7 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { } // Set SSE headers - ctx.Response.Header.Set("Content-Type", "text/event-stream") + ctx.Response.Header.Set("Content-Type", "text/event-transport") ctx.Response.Header.Set("Cache-Control", "no-cache") ctx.Response.Header.Set("Connection", "keep-alive") ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") @@ -202,7 +202,7 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { } }() - // Define the stream writer function + // Define the transport writer function streamFunc := func(w *bufio.Writer) { newCount := h.activeClients.Add(1) fmt.Printf("[HTTP DEBUG] Client connected on port %d. Count now: %d\n", @@ -336,8 +336,8 @@ func (h *HTTPStreamer) handleStatus(ctx *fasthttp.RequestCtx) { "mode": map[string]bool{"standalone": h.standalone, "router": !h.standalone}, }, "endpoints": map[string]string{ - "stream": h.streamPath, - "status": h.statusPath, + "transport": h.streamPath, + "status": h.statusPath, }, "features": map[string]any{ "heartbeat": map[string]any{ @@ -361,7 +361,7 @@ func (h *HTTPStreamer) GetActiveConnections() int32 { return h.activeClients.Load() } -// Returns the configured stream endpoint path +// Returns the configured transport endpoint path func (h *HTTPStreamer) GetStreamPath() string { return h.streamPath } diff --git a/src/internal/stream/noop_logger.go b/src/internal/transport/noop_logger.go similarity index 85% rename from src/internal/stream/noop_logger.go rename to src/internal/transport/noop_logger.go index 61f89fc..cd935c1 100644 --- a/src/internal/stream/noop_logger.go +++ b/src/internal/transport/noop_logger.go @@ -1,5 +1,5 @@ -// FILE: src/internal/stream/noop_logger.go -package stream +// FILE: src/internal/transport/noop_logger.go +package transport // noopLogger implements gnet's Logger interface but discards everything type noopLogger struct{} diff --git a/src/internal/stream/tcpserver.go b/src/internal/transport/tcpserver.go similarity index 96% rename from src/internal/stream/tcpserver.go rename to src/internal/transport/tcpserver.go index 74c628a..a88ee91 100644 --- a/src/internal/stream/tcpserver.go +++ b/src/internal/transport/tcpserver.go @@ -1,5 +1,5 @@ // FILE: src/internal/monitor/tcpserver.go -package stream +package transport import ( "fmt" @@ -17,7 +17,10 @@ type tcpServer struct { func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action { // Store engine reference for shutdown + s.streamer.engineMu.Lock() s.streamer.engine = &eng + s.streamer.engineMu.Unlock() + fmt.Printf("[TCP DEBUG] Server booted on port %d\n", s.streamer.config.Port) return gnet.None } diff --git a/src/internal/stream/tcpstreamer.go b/src/internal/transport/tcpstreamer.go similarity index 93% rename from src/internal/stream/tcpstreamer.go rename to src/internal/transport/tcpstreamer.go index b3b50ca..c7c9e7a 100644 --- a/src/internal/stream/tcpstreamer.go +++ b/src/internal/transport/tcpstreamer.go @@ -1,5 +1,5 @@ -// FILE: src/internal/stream/tcpstreamer.go -package stream +// FILE: src/internal/transport/tcpstreamer.go +package transport import ( "context" @@ -23,6 +23,7 @@ type TCPStreamer struct { activeConns atomic.Int32 startTime time.Time engine *gnet.Engine + engineMu sync.Mutex wg sync.WaitGroup rateLimiter *ratelimit.Limiter } @@ -84,10 +85,14 @@ func (t *TCPStreamer) Stop() { close(t.done) // Stop gnet engine if running - if t.engine != nil { + t.engineMu.Lock() + engine := t.engine + t.engineMu.Unlock() + + if engine != nil { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - t.engine.Stop(ctx) + (*engine).Stop(ctx) // Dereference the pointer } // Wait for broadcast loop to finish