v0.1.3 stream changed from net/http to fasthttp for http and gnet for tcp stream, heartbeat config added

This commit is contained in:
2025-07-01 23:43:51 -04:00
parent a3450a9589
commit a7595061ba
13 changed files with 1134 additions and 1474 deletions

View File

@ -1,230 +1,174 @@
// File: logwisp/src/cmd/logwisp/main.go
// FILE: src/cmd/logwisp/main.go
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"logwisp/src/internal/config"
"logwisp/src/internal/middleware"
"logwisp/src/internal/monitor"
"logwisp/src/internal/stream"
)
func main() {
// Parse flags manually without init()
var colorMode bool
flag.BoolVar(&colorMode, "c", false, "Enable color pass-through for escape codes in logs")
flag.BoolVar(&colorMode, "color", false, "Enable color pass-through for escape codes in logs")
// Additional CLI flags that override config
// Parse CLI flags
var (
port = flag.Int("port", 0, "HTTP port (overrides config)")
bufferSize = flag.Int("buffer-size", 0, "Stream buffer size (overrides config)")
checkInterval = flag.Int("check-interval", 0, "File check interval in ms (overrides config)")
rateLimit = flag.Bool("rate-limit", false, "Enable rate limiting (overrides config)")
rateRequests = flag.Int("rate-requests", 0, "Rate limit requests/sec (overrides config)")
rateBurst = flag.Int("rate-burst", 0, "Rate limit burst size (overrides config)")
configFile = flag.String("config", "", "Config file path (overrides LOGWISP_CONFIG_FILE)")
configFile = flag.String("config", "", "Config file path")
// Legacy compatibility flags
port = flag.Int("port", 0, "HTTP port (legacy, maps to --http-port)")
bufferSize = flag.Int("buffer-size", 0, "Buffer size (legacy, maps to --http-buffer-size)")
// New explicit flags
httpPort = flag.Int("http-port", 0, "HTTP server port")
httpBuffer = flag.Int("http-buffer-size", 0, "HTTP server buffer size")
tcpPort = flag.Int("tcp-port", 0, "TCP server port")
tcpBuffer = flag.Int("tcp-buffer-size", 0, "TCP server buffer size")
enableTCP = flag.Bool("enable-tcp", false, "Enable TCP server")
enableHTTP = flag.Bool("enable-http", false, "Enable HTTP server")
checkInterval = flag.Int("check-interval", 0, "File check interval in ms")
)
flag.Parse()
// Set config file env var if specified via CLI
if *configFile != "" {
os.Setenv("LOGWISP_CONFIG_FILE", *configFile)
}
// Build CLI override args for config package
// Build CLI args for config
var cliArgs []string
// Legacy mapping
if *port > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--port=%d", *port))
cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.port=%d", *port))
}
if *bufferSize > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--stream.buffer_size=%d", *bufferSize))
cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.buffer_size=%d", *bufferSize))
}
// New flags
if *httpPort > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.port=%d", *httpPort))
}
if *httpBuffer > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.buffer_size=%d", *httpBuffer))
}
if *tcpPort > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.port=%d", *tcpPort))
}
if *tcpBuffer > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.buffer_size=%d", *tcpBuffer))
}
if flag.Lookup("enable-tcp").DefValue != flag.Lookup("enable-tcp").Value.String() {
cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.enabled=%v", *enableTCP))
}
if flag.Lookup("enable-http").DefValue != flag.Lookup("enable-http").Value.String() {
cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.enabled=%v", *enableHTTP))
}
if *checkInterval > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--monitor.check_interval_ms=%d", *checkInterval))
}
if flag.Lookup("rate-limit").DefValue != flag.Lookup("rate-limit").Value.String() {
// Rate limit flag was explicitly set
cliArgs = append(cliArgs, fmt.Sprintf("--stream.rate_limit.enabled=%v", *rateLimit))
}
if *rateRequests > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--stream.rate_limit.requests_per_second=%d", *rateRequests))
}
if *rateBurst > 0 {
cliArgs = append(cliArgs, fmt.Sprintf("--stream.rate_limit.burst_size=%d", *rateBurst))
}
// Parse remaining args as monitor targets
// Parse monitor targets from remaining args
for _, arg := range flag.Args() {
if strings.Contains(arg, ":") {
// Format: path:pattern:isfile
cliArgs = append(cliArgs, fmt.Sprintf("--monitor.targets.add=%s", arg))
} else if stat, err := os.Stat(arg); err == nil {
// Auto-detect file vs directory
if stat.IsDir() {
cliArgs = append(cliArgs, fmt.Sprintf("--monitor.targets.add=%s:*.log:false", arg))
} else {
cliArgs = append(cliArgs, fmt.Sprintf("--monitor.targets.add=%s::true", arg))
}
}
cliArgs = append(cliArgs, fmt.Sprintf("--monitor.targets.add=%s", arg))
}
// Load configuration with CLI overrides
// Load configuration
cfg, err := config.LoadWithCLI(cliArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err)
os.Exit(1)
}
// Create context for graceful shutdown
// Create context for shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// WaitGroup for tracking all goroutines
var wg sync.WaitGroup
// Create components
streamer := stream.NewWithOptions(cfg.Stream.BufferSize, colorMode)
mon := monitor.New(streamer.Publish)
// Set monitor check interval from config
// Create monitor
mon := monitor.New()
mon.SetCheckInterval(time.Duration(cfg.Monitor.CheckIntervalMs) * time.Millisecond)
// Add monitor targets from config
// Add targets
for _, target := range cfg.Monitor.Targets {
if err := mon.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil {
fmt.Fprintf(os.Stderr, "Failed to add target %s: %v\n", target.Path, err)
}
}
// Start monitoring
// Start monitor
if err := mon.Start(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Failed to start monitor: %v\n", err)
os.Exit(1)
}
// Setup HTTP server
mux := http.NewServeMux()
var tcpServer *stream.TCPStreamer
var httpServer *stream.HTTPStreamer
// Create handler with optional rate limiting
var handler http.Handler = streamer
var rateLimiter *middleware.RateLimiter
// Start TCP server if enabled
if cfg.TCPServer.Enabled {
tcpChan := mon.Subscribe()
tcpServer = stream.NewTCPStreamer(tcpChan, cfg.TCPServer)
if cfg.Stream.RateLimit.Enabled {
rateLimiter = middleware.NewRateLimiter(
cfg.Stream.RateLimit.RequestsPerSecond,
cfg.Stream.RateLimit.BurstSize,
cfg.Stream.RateLimit.CleanupIntervalS,
)
handler = rateLimiter.Middleware(handler)
fmt.Printf("Rate limiting enabled: %d req/s, burst %d\n",
cfg.Stream.RateLimit.RequestsPerSecond,
cfg.Stream.RateLimit.BurstSize)
wg.Add(1)
go func() {
defer wg.Done()
if err := tcpServer.Start(); err != nil {
fmt.Fprintf(os.Stderr, "TCP server error: %v\n", err)
}
}()
fmt.Printf("TCP streaming on port %d\n", cfg.TCPServer.Port)
}
mux.Handle("/stream", handler)
// Start HTTP server if enabled
if cfg.HTTPServer.Enabled {
httpChan := mon.Subscribe()
httpServer = stream.NewHTTPStreamer(httpChan, cfg.HTTPServer)
// Enhanced status endpoint
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
wg.Add(1)
go func() {
defer wg.Done()
if err := httpServer.Start(); err != nil {
fmt.Fprintf(os.Stderr, "HTTP server error: %v\n", err)
}
}()
status := map[string]interface{}{
"service": "LogWisp",
"version": "2.0.0",
"port": cfg.Port,
"color_mode": colorMode,
"config": map[string]interface{}{
"monitor": map[string]interface{}{
"check_interval_ms": cfg.Monitor.CheckIntervalMs,
"targets_count": len(cfg.Monitor.Targets),
},
"stream": map[string]interface{}{
"buffer_size": cfg.Stream.BufferSize,
"rate_limit": map[string]interface{}{
"enabled": cfg.Stream.RateLimit.Enabled,
"requests_per_second": cfg.Stream.RateLimit.RequestsPerSecond,
"burst_size": cfg.Stream.RateLimit.BurstSize,
},
},
},
}
// Add runtime stats
if rateLimiter != nil {
status["rate_limiter"] = rateLimiter.Stats()
}
status["streamer"] = streamer.Stats()
json.NewEncoder(w).Encode(status)
})
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: mux,
// Add timeouts for better shutdown behavior
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
fmt.Printf("HTTP/SSE streaming on http://localhost:%d/stream\n", cfg.HTTPServer.Port)
fmt.Printf("Status available at http://localhost:%d/status\n", cfg.HTTPServer.Port)
}
// Start server in goroutine
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("LogWisp streaming on http://localhost:%d/stream\n", cfg.Port)
fmt.Printf("Status available at http://localhost:%d/status\n", cfg.Port)
if colorMode {
fmt.Println("Color pass-through enabled")
}
fmt.Printf("Config loaded from: %s\n", config.GetConfigPath())
if !cfg.TCPServer.Enabled && !cfg.HTTPServer.Enabled {
fmt.Fprintln(os.Stderr, "No servers enabled. Enable at least one server in config.")
os.Exit(1)
}
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
}
}()
// Wait for shutdown signal
// Wait for shutdown
<-sigChan
fmt.Println("\nShutting down...")
// Cancel context to stop all components
// Stop servers first
if tcpServer != nil {
tcpServer.Stop()
}
if httpServer != nil {
httpServer.Stop()
}
// Cancel context and stop monitor
cancel()
// Create shutdown context with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
// Shutdown server first
if err := server.Shutdown(shutdownCtx); err != nil {
fmt.Fprintf(os.Stderr, "Server shutdown error: %v\n", err)
// Force close if graceful shutdown fails
server.Close()
}
// Stop all components
mon.Stop()
streamer.Stop()
if rateLimiter != nil {
rateLimiter.Stop()
}
// Wait for all goroutines with timeout
// Wait for completion
done := make(chan struct{})
go func() {
wg.Wait()
@ -235,6 +179,6 @@ func main() {
case <-done:
fmt.Println("Shutdown complete")
case <-time.After(2 * time.Second):
fmt.Println("Shutdown timeout, forcing exit")
fmt.Println("Shutdown timeout")
}
}

View File

@ -1,4 +1,4 @@
// File: logwisp/src/internal/config/config.go
// FILE: src/internal/config/config.go
package config
import (
@ -10,114 +10,97 @@ import (
lconfig "github.com/lixenwraith/config"
)
// Config holds the complete configuration
type Config struct {
Port int `toml:"port"`
Monitor MonitorConfig `toml:"monitor"`
Stream StreamConfig `toml:"stream"`
Monitor MonitorConfig `toml:"monitor"`
TCPServer TCPConfig `toml:"tcpserver"`
HTTPServer HTTPConfig `toml:"httpserver"`
}
// MonitorConfig holds monitoring settings
type MonitorConfig struct {
CheckIntervalMs int `toml:"check_interval_ms"`
Targets []MonitorTarget `toml:"targets"`
}
// MonitorTarget represents a path to monitor
type MonitorTarget struct {
Path string `toml:"path"` // File or directory path
Pattern string `toml:"pattern"` // Glob pattern for directories
IsFile bool `toml:"is_file"` // True if monitoring specific file
Path string `toml:"path"`
Pattern string `toml:"pattern"`
IsFile bool `toml:"is_file"`
}
// StreamConfig holds streaming settings
type StreamConfig struct {
BufferSize int `toml:"buffer_size"`
RateLimit RateLimitConfig `toml:"rate_limit"`
type TCPConfig struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
BufferSize int `toml:"buffer_size"`
SSLEnabled bool `toml:"ssl_enabled"`
SSLCertFile string `toml:"ssl_cert_file"`
SSLKeyFile string `toml:"ssl_key_file"`
Heartbeat HeartbeatConfig `toml:"heartbeat"`
}
// RateLimitConfig holds rate limiting settings
type RateLimitConfig struct {
Enabled bool `toml:"enabled"`
RequestsPerSecond int `toml:"requests_per_second"`
BurstSize int `toml:"burst_size"`
CleanupIntervalS int64 `toml:"cleanup_interval_s"`
type HTTPConfig struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
BufferSize int `toml:"buffer_size"`
SSLEnabled bool `toml:"ssl_enabled"`
SSLCertFile string `toml:"ssl_cert_file"`
SSLKeyFile string `toml:"ssl_key_file"`
Heartbeat HeartbeatConfig `toml:"heartbeat"`
}
type HeartbeatConfig struct {
Enabled bool `toml:"enabled"`
IntervalSeconds int `toml:"interval_seconds"`
IncludeTimestamp bool `toml:"include_timestamp"`
IncludeStats bool `toml:"include_stats"`
Format string `toml:"format"` // "comment" or "json"
}
// defaults returns configuration with default values
func defaults() *Config {
return &Config{
Port: 8080,
Monitor: MonitorConfig{
CheckIntervalMs: 100,
Targets: []MonitorTarget{
{Path: "./", Pattern: "*.log", IsFile: false},
},
},
Stream: StreamConfig{
TCPServer: TCPConfig{
Enabled: false,
Port: 9090,
BufferSize: 1000,
RateLimit: RateLimitConfig{
Enabled: false,
RequestsPerSecond: 10,
BurstSize: 20,
CleanupIntervalS: 60,
Heartbeat: HeartbeatConfig{
Enabled: false,
IntervalSeconds: 30,
IncludeTimestamp: true,
IncludeStats: false,
Format: "json",
},
},
HTTPServer: HTTPConfig{
Enabled: true,
Port: 8080,
BufferSize: 1000,
Heartbeat: HeartbeatConfig{
Enabled: true,
IntervalSeconds: 30,
IncludeTimestamp: true,
IncludeStats: false,
Format: "comment",
},
},
}
}
// Load reads configuration using lixenwraith/config Builder pattern
func Load() (*Config, error) {
configPath := GetConfigPath()
cfg, err := lconfig.NewBuilder().
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithEnvTransform(customEnvTransform).
WithSources(
lconfig.SourceEnv,
lconfig.SourceFile,
lconfig.SourceDefault,
).
Build()
if err != nil {
// Only fail on actual errors, not missing config file
if !strings.Contains(err.Error(), "not found") {
return nil, fmt.Errorf("failed to load config: %w", err)
}
}
// Special handling for LOGWISP_MONITOR_TARGETS env var
if err := handleMonitorTargetsEnv(cfg); err != nil {
return nil, err
}
// Scan into final config
finalConfig := &Config{}
if err := cfg.Scan("", finalConfig); err != nil {
return nil, fmt.Errorf("failed to scan config: %w", err)
}
return finalConfig, finalConfig.validate()
}
// LoadWithCLI loads configuration and applies CLI arguments
func LoadWithCLI(cliArgs []string) (*Config, error) {
configPath := GetConfigPath()
// Convert CLI args to config format
convertedArgs := convertCLIArgs(cliArgs)
cfg, err := lconfig.NewBuilder().
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithArgs(convertedArgs).
WithArgs(cliArgs).
WithEnvTransform(customEnvTransform).
WithSources(
lconfig.SourceCLI, // CLI highest priority
lconfig.SourceCLI,
lconfig.SourceEnv,
lconfig.SourceFile,
lconfig.SourceDefault,
@ -130,12 +113,10 @@ func LoadWithCLI(cliArgs []string) (*Config, error) {
}
}
// Handle special env var
if err := handleMonitorTargetsEnv(cfg); err != nil {
return nil, err
}
// Scan into final config
finalConfig := &Config{}
if err := cfg.Scan("", finalConfig); err != nil {
return nil, fmt.Errorf("failed to scan config: %w", err)
@ -144,52 +125,14 @@ func LoadWithCLI(cliArgs []string) (*Config, error) {
return finalConfig, finalConfig.validate()
}
// customEnvTransform handles LOGWISP_ prefix environment variables
func customEnvTransform(path string) string {
// Standard transform
env := strings.ReplaceAll(path, ".", "_")
env = strings.ToUpper(env)
env = "LOGWISP_" + env
// Handle common variations
switch env {
case "LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SECOND":
if _, exists := os.LookupEnv("LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC"); exists {
return "LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC"
}
case "LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL_S":
if _, exists := os.LookupEnv("LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL"); exists {
return "LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL"
}
}
return env
}
// convertCLIArgs converts CLI args to config package format
func convertCLIArgs(args []string) []string {
var converted []string
for _, arg := range args {
switch {
case arg == "-c" || arg == "--color":
// Color mode is handled separately by main.go
continue
case strings.HasPrefix(arg, "--config="):
// Config file path handled separately
continue
case strings.HasPrefix(arg, "--"):
// Pass through other long flags
converted = append(converted, arg)
}
}
return converted
}
// GetConfigPath returns the configuration file path
func GetConfigPath() string {
// Check explicit config file paths
if configFile := os.Getenv("LOGWISP_CONFIG_FILE"); configFile != "" {
if filepath.IsAbs(configFile) {
return configFile
@ -204,7 +147,6 @@ func GetConfigPath() string {
return filepath.Join(configDir, "logwisp.toml")
}
// Default location
if homeDir, err := os.UserHomeDir(); err == nil {
return filepath.Join(homeDir, ".config", "logwisp.toml")
}
@ -212,13 +154,10 @@ func GetConfigPath() string {
return "logwisp.toml"
}
// handleMonitorTargetsEnv handles comma-separated monitor targets env var
func handleMonitorTargetsEnv(cfg *lconfig.Config) error {
if targetsStr := os.Getenv("LOGWISP_MONITOR_TARGETS"); targetsStr != "" {
// Clear any existing targets from file/defaults
cfg.Set("monitor.targets", []MonitorTarget{})
// Parse comma-separated format: path:pattern:isfile,path2:pattern2:isfile
parts := strings.Split(targetsStr, ",")
for i, part := range parts {
targetParts := strings.Split(part, ":")
@ -248,12 +187,7 @@ func handleMonitorTargetsEnv(cfg *lconfig.Config) error {
return nil
}
// validate ensures configuration is valid
func (c *Config) validate() error {
if c.Port < 1 || c.Port > 65535 {
return fmt.Errorf("invalid port: %d", c.Port)
}
if c.Monitor.CheckIntervalMs < 10 {
return fmt.Errorf("check interval too small: %d ms", c.Monitor.CheckIntervalMs)
}
@ -266,33 +200,44 @@ func (c *Config) validate() error {
if target.Path == "" {
return fmt.Errorf("target %d: empty path", i)
}
if !target.IsFile && target.Pattern == "" {
return fmt.Errorf("target %d: pattern required for directory monitoring", i)
}
// SECURITY: Validate paths don't contain directory traversal
if strings.Contains(target.Path, "..") {
return fmt.Errorf("target %d: path contains directory traversal", i)
}
}
if c.Stream.BufferSize < 1 {
return fmt.Errorf("buffer size must be positive: %d", c.Stream.BufferSize)
if c.TCPServer.Enabled {
if c.TCPServer.Port < 1 || c.TCPServer.Port > 65535 {
return fmt.Errorf("invalid TCP port: %d", c.TCPServer.Port)
}
if c.TCPServer.BufferSize < 1 {
return fmt.Errorf("TCP buffer size must be positive: %d", c.TCPServer.BufferSize)
}
}
if c.Stream.RateLimit.Enabled {
if c.Stream.RateLimit.RequestsPerSecond < 1 {
return fmt.Errorf("rate limit requests per second must be positive: %d",
c.Stream.RateLimit.RequestsPerSecond)
if c.HTTPServer.Enabled {
if c.HTTPServer.Port < 1 || c.HTTPServer.Port > 65535 {
return fmt.Errorf("invalid HTTP port: %d", c.HTTPServer.Port)
}
if c.Stream.RateLimit.BurstSize < 1 {
return fmt.Errorf("rate limit burst size must be positive: %d",
c.Stream.RateLimit.BurstSize)
if c.HTTPServer.BufferSize < 1 {
return fmt.Errorf("HTTP buffer size must be positive: %d", c.HTTPServer.BufferSize)
}
if c.Stream.RateLimit.CleanupIntervalS < 1 {
return fmt.Errorf("rate limit cleanup interval must be positive: %d",
c.Stream.RateLimit.CleanupIntervalS)
}
if c.TCPServer.Enabled && c.TCPServer.Heartbeat.Enabled {
if c.TCPServer.Heartbeat.IntervalSeconds < 1 {
return fmt.Errorf("TCP heartbeat interval must be positive: %d", c.TCPServer.Heartbeat.IntervalSeconds)
}
if c.TCPServer.Heartbeat.Format != "json" && c.TCPServer.Heartbeat.Format != "comment" {
return fmt.Errorf("TCP heartbeat format must be 'json' or 'comment': %s", c.TCPServer.Heartbeat.Format)
}
}
if c.HTTPServer.Enabled && c.HTTPServer.Heartbeat.Enabled {
if c.HTTPServer.Heartbeat.IntervalSeconds < 1 {
return fmt.Errorf("HTTP heartbeat interval must be positive: %d", c.HTTPServer.Heartbeat.IntervalSeconds)
}
if c.HTTPServer.Heartbeat.Format != "json" && c.HTTPServer.Heartbeat.Format != "comment" {
return fmt.Errorf("HTTP heartbeat format must be 'json' or 'comment': %s", c.HTTPServer.Heartbeat.Format)
}
}

View File

@ -1,126 +0,0 @@
// File: logwisp/src/internal/middleware/ratelimit.go
package middleware
import (
"fmt"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
// RateLimiter provides per-client rate limiting
type RateLimiter struct {
clients sync.Map // map[string]*clientLimiter
requestsPerSec int
burstSize int
cleanupInterval time.Duration
done chan struct{}
}
type clientLimiter struct {
limiter *rate.Limiter
lastSeen time.Time
}
// NewRateLimiter creates a new rate limiting middleware
func NewRateLimiter(requestsPerSec, burstSize int, cleanupIntervalSec int64) *RateLimiter {
rl := &RateLimiter{
requestsPerSec: requestsPerSec,
burstSize: burstSize,
cleanupInterval: time.Duration(cleanupIntervalSec) * time.Second,
done: make(chan struct{}),
}
// Start cleanup routine
go rl.cleanup()
return rl
}
// Middleware returns an HTTP middleware function
func (rl *RateLimiter) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Get client IP
clientIP := r.RemoteAddr
if forwarded := r.Header.Get("X-Forwarded-For"); forwarded != "" {
clientIP = forwarded
}
// Get or create limiter for client
limiter := rl.getLimiter(clientIP)
// Check rate limit
if !limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
// Continue to next handler
next.ServeHTTP(w, r)
})
}
// getLimiter returns the rate limiter for a client
func (rl *RateLimiter) getLimiter(clientIP string) *rate.Limiter {
// Try to get existing limiter
if val, ok := rl.clients.Load(clientIP); ok {
client := val.(*clientLimiter)
client.lastSeen = time.Now()
return client.limiter
}
// Create new limiter
limiter := rate.NewLimiter(rate.Limit(rl.requestsPerSec), rl.burstSize)
client := &clientLimiter{
limiter: limiter,
lastSeen: time.Now(),
}
rl.clients.Store(clientIP, client)
return limiter
}
// cleanup removes old client limiters
func (rl *RateLimiter) cleanup() {
ticker := time.NewTicker(rl.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-rl.done:
return
case <-ticker.C:
rl.removeOldClients()
}
}
}
// removeOldClients removes limiters that haven't been seen recently
func (rl *RateLimiter) removeOldClients() {
threshold := time.Now().Add(-rl.cleanupInterval * 2) // Keep for 2x cleanup interval
rl.clients.Range(func(key, value interface{}) bool {
client := value.(*clientLimiter)
if client.lastSeen.Before(threshold) {
rl.clients.Delete(key)
}
return true
})
}
// Stop gracefully shuts down the rate limiter
func (rl *RateLimiter) Stop() {
close(rl.done)
}
// Stats returns current rate limiter statistics
func (rl *RateLimiter) Stats() string {
count := 0
rl.clients.Range(func(_, _ interface{}) bool {
count++
return true
})
return fmt.Sprintf("Active clients: %d", count)
}

View File

@ -0,0 +1,261 @@
package monitor
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
type fileWatcher struct {
path string
callback func(LogEntry)
position int64
size int64
inode uint64
modTime time.Time
mu sync.Mutex
stopped bool
rotationSeq int
}
func newFileWatcher(path string, callback func(LogEntry)) *fileWatcher {
return &fileWatcher{
path: path,
callback: callback,
}
}
func (w *fileWatcher) watch(ctx context.Context) {
if err := w.seekToEnd(); err != nil {
return
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if w.isStopped() {
return
}
w.checkFile()
}
}
}
func (w *fileWatcher) seekToEnd() error {
file, err := os.Open(w.path)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
pos, err := file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
w.mu.Lock()
w.position = pos
w.size = info.Size()
w.modTime = info.ModTime()
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
w.inode = stat.Ino
}
w.mu.Unlock()
return nil
}
func (w *fileWatcher) checkFile() error {
file, err := os.Open(w.path)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
w.mu.Lock()
oldPos := w.position
oldSize := w.size
oldInode := w.inode
oldModTime := w.modTime
w.mu.Unlock()
currentSize := info.Size()
currentModTime := info.ModTime()
var currentInode uint64
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
currentInode = stat.Ino
}
rotated := false
rotationReason := ""
if oldInode != 0 && currentInode != 0 && currentInode != oldInode {
rotated = true
rotationReason = "inode change"
}
if !rotated && currentSize < oldSize {
rotated = true
rotationReason = "size decrease"
}
if !rotated && currentModTime.Before(oldModTime) && currentSize <= oldSize {
rotated = true
rotationReason = "modification time reset"
}
if !rotated && oldPos > currentSize+1024 {
rotated = true
rotationReason = "position beyond file size"
}
newPos := oldPos
if rotated {
newPos = 0
w.mu.Lock()
w.rotationSeq++
seq := w.rotationSeq
w.inode = currentInode
w.mu.Unlock()
w.callback(LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: "INFO",
Message: fmt.Sprintf("Log rotation detected (#%d): %s", seq, rotationReason),
})
}
if _, err := file.Seek(newPos, io.SeekStart); err != nil {
return err
}
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
entry := w.parseLine(line)
w.callback(entry)
}
if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil {
w.mu.Lock()
w.position = currentPos
w.size = currentSize
w.modTime = currentModTime
w.mu.Unlock()
}
return scanner.Err()
}
func (w *fileWatcher) parseLine(line string) LogEntry {
var jsonLog struct {
Time string `json:"time"`
Level string `json:"level"`
Message string `json:"msg"`
Fields json.RawMessage `json:"fields"`
}
if err := json.Unmarshal([]byte(line), &jsonLog); err == nil {
timestamp, err := time.Parse(time.RFC3339Nano, jsonLog.Time)
if err != nil {
timestamp = time.Now()
}
return LogEntry{
Time: timestamp,
Source: filepath.Base(w.path),
Level: jsonLog.Level,
Message: jsonLog.Message,
Fields: jsonLog.Fields,
}
}
level := extractLogLevel(line)
return LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: level,
Message: line,
}
}
func extractLogLevel(line string) string {
patterns := []struct {
patterns []string
level string
}{
{[]string{"[ERROR]", "ERROR:", " ERROR ", "ERR:", "[ERR]", "FATAL:", "[FATAL]"}, "ERROR"},
{[]string{"[WARN]", "WARN:", " WARN ", "WARNING:", "[WARNING]"}, "WARN"},
{[]string{"[INFO]", "INFO:", " INFO ", "[INF]", "INF:"}, "INFO"},
{[]string{"[DEBUG]", "DEBUG:", " DEBUG ", "[DBG]", "DBG:"}, "DEBUG"},
{[]string{"[TRACE]", "TRACE:", " TRACE "}, "TRACE"},
}
upperLine := strings.ToUpper(line)
for _, group := range patterns {
for _, pattern := range group.patterns {
if strings.Contains(upperLine, pattern) {
return group.level
}
}
}
return ""
}
func globToRegex(glob string) string {
regex := regexp.QuoteMeta(glob)
regex = strings.ReplaceAll(regex, `\*`, `.*`)
regex = strings.ReplaceAll(regex, `\?`, `.`)
return "^" + regex + "$"
}
func (w *fileWatcher) close() {
w.stop()
}
func (w *fileWatcher) stop() {
w.mu.Lock()
w.stopped = true
w.mu.Unlock()
}
func (w *fileWatcher) isStopped() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.stopped
}

View File

@ -1,22 +1,17 @@
// File: logwisp/src/internal/monitor/monitor.go
// FILE: src/internal/monitor/monitor.go
package monitor
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
// LogEntry represents a log line to be streamed
type LogEntry struct {
Time time.Time `json:"time"`
Source string `json:"source"`
@ -25,9 +20,8 @@ type LogEntry struct {
Fields json.RawMessage `json:"fields,omitempty"`
}
// Monitor watches files and directories for log entries
type Monitor struct {
callback func(LogEntry)
subscribers []chan LogEntry
targets []target
watchers map[string]*fileWatcher
mu sync.RWMutex
@ -41,26 +35,44 @@ type target struct {
path string
pattern string
isFile bool
regex *regexp.Regexp // FIXED: Compiled pattern for performance
regex *regexp.Regexp
}
// New creates a new monitor instance
func New(callback func(LogEntry)) *Monitor {
func New() *Monitor {
return &Monitor{
callback: callback,
watchers: make(map[string]*fileWatcher),
checkInterval: 100 * time.Millisecond,
}
}
// SetCheckInterval configures the file check frequency
func (m *Monitor) Subscribe() chan LogEntry {
m.mu.Lock()
defer m.mu.Unlock()
ch := make(chan LogEntry, 1000)
m.subscribers = append(m.subscribers, ch)
return ch
}
func (m *Monitor) publish(entry LogEntry) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, ch := range m.subscribers {
select {
case ch <- entry:
default:
// Drop message if channel full
}
}
}
func (m *Monitor) SetCheckInterval(interval time.Duration) {
m.mu.Lock()
m.checkInterval = interval
m.mu.Unlock()
}
// AddTarget adds a path to monitor with enhanced pattern support
func (m *Monitor) AddTarget(path, pattern string, isFile bool) error {
absPath, err := filepath.Abs(path)
if err != nil {
@ -69,7 +81,6 @@ func (m *Monitor) AddTarget(path, pattern string, isFile bool) error {
var compiledRegex *regexp.Regexp
if !isFile && pattern != "" {
// FIXED: Convert glob pattern to regex for better matching
regexPattern := globToRegex(pattern)
compiledRegex, err = regexp.Compile(regexPattern)
if err != nil {
@ -89,13 +100,10 @@ func (m *Monitor) AddTarget(path, pattern string, isFile bool) error {
return nil
}
// Start begins monitoring with configurable interval
func (m *Monitor) Start(ctx context.Context) error {
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(1)
go m.monitorLoop()
return nil
}
@ -109,14 +117,15 @@ func (m *Monitor) Stop() {
for _, w := range m.watchers {
w.close()
}
for _, ch := range m.subscribers {
close(ch)
}
m.mu.Unlock()
}
// FIXED: Enhanced monitoring loop with configurable interval
func (m *Monitor) monitorLoop() {
defer m.wg.Done()
// Initial scan
m.checkTargets()
m.mu.RLock()
@ -133,7 +142,6 @@ func (m *Monitor) monitorLoop() {
case <-ticker.C:
m.checkTargets()
// Update ticker interval if changed
m.mu.RLock()
newInterval := m.checkInterval
m.mu.RUnlock()
@ -147,7 +155,6 @@ func (m *Monitor) monitorLoop() {
}
}
// FIXED: Enhanced target checking with better file discovery
func (m *Monitor) checkTargets() {
m.mu.RLock()
targets := make([]target, len(m.targets))
@ -158,12 +165,10 @@ func (m *Monitor) checkTargets() {
if t.isFile {
m.ensureWatcher(t.path)
} else {
// FIXED: More efficient directory scanning
files, err := m.scanDirectory(t.path, t.regex)
if err != nil {
continue
}
for _, file := range files {
m.ensureWatcher(file)
}
@ -173,7 +178,6 @@ func (m *Monitor) checkTargets() {
m.cleanupWatchers()
}
// FIXED: Optimized directory scanning
func (m *Monitor) scanDirectory(dir string, pattern *regexp.Regexp) ([]string, error) {
entries, err := os.ReadDir(dir)
if err != nil {
@ -207,7 +211,7 @@ func (m *Monitor) ensureWatcher(path string) {
return
}
w := newFileWatcher(path, m.callback)
w := newFileWatcher(path, m.publish)
m.watchers[path] = w
m.wg.Add(1)
@ -231,268 +235,4 @@ func (m *Monitor) cleanupWatchers() {
delete(m.watchers, path)
}
}
}
// fileWatcher with enhanced rotation detection
type fileWatcher struct {
path string
callback func(LogEntry)
position int64
size int64
inode uint64
modTime time.Time
mu sync.Mutex
stopped bool
rotationSeq int // FIXED: Track rotation sequence for logging
}
func newFileWatcher(path string, callback func(LogEntry)) *fileWatcher {
return &fileWatcher{
path: path,
callback: callback,
}
}
func (w *fileWatcher) watch(ctx context.Context) {
if err := w.seekToEnd(); err != nil {
return
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if w.isStopped() {
return
}
w.checkFile()
}
}
}
// FIXED: Enhanced file state tracking for better rotation detection
func (w *fileWatcher) seekToEnd() error {
file, err := os.Open(w.path)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
pos, err := file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
w.mu.Lock()
w.position = pos
w.size = info.Size()
w.modTime = info.ModTime()
// Get inode for rotation detection (Unix-specific)
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
w.inode = stat.Ino
}
w.mu.Unlock()
return nil
}
// FIXED: Enhanced rotation detection with multiple signals
func (w *fileWatcher) checkFile() error {
file, err := os.Open(w.path)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
w.mu.Lock()
oldPos := w.position
oldSize := w.size
oldInode := w.inode
oldModTime := w.modTime
w.mu.Unlock()
currentSize := info.Size()
currentModTime := info.ModTime()
var currentInode uint64
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
currentInode = stat.Ino
}
// FIXED: Multiple rotation detection methods
rotated := false
rotationReason := ""
// Method 1: Inode change (most reliable on Unix)
if oldInode != 0 && currentInode != 0 && currentInode != oldInode {
rotated = true
rotationReason = "inode change"
}
// Method 2: File size decrease
if !rotated && currentSize < oldSize {
rotated = true
rotationReason = "size decrease"
}
// Method 3: File modification time reset while size is same or smaller
if !rotated && currentModTime.Before(oldModTime) && currentSize <= oldSize {
rotated = true
rotationReason = "modification time reset"
}
// Method 4: Large position vs current size discrepancy
if !rotated && oldPos > currentSize+1024 { // Allow some buffer
rotated = true
rotationReason = "position beyond file size"
}
newPos := oldPos
if rotated {
newPos = 0
w.mu.Lock()
w.rotationSeq++
seq := w.rotationSeq
w.inode = currentInode
w.mu.Unlock()
// Log rotation event
w.callback(LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: "INFO",
Message: fmt.Sprintf("Log rotation detected (#%d): %s", seq, rotationReason),
})
}
// Seek to position and read new content
if _, err := file.Seek(newPos, io.SeekStart); err != nil {
return err
}
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) // 1MB max line
lineCount := 0
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
entry := w.parseLine(line)
w.callback(entry)
lineCount++
}
// Update file state
if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil {
w.mu.Lock()
w.position = currentPos
w.size = currentSize
w.modTime = currentModTime
w.mu.Unlock()
}
return scanner.Err()
}
// FIXED: Enhanced log parsing with more level detection patterns
func (w *fileWatcher) parseLine(line string) LogEntry {
var jsonLog struct {
Time string `json:"time"`
Level string `json:"level"`
Message string `json:"msg"`
Fields json.RawMessage `json:"fields"`
}
// Try JSON parsing first
if err := json.Unmarshal([]byte(line), &jsonLog); err == nil {
timestamp, err := time.Parse(time.RFC3339Nano, jsonLog.Time)
if err != nil {
timestamp = time.Now()
}
return LogEntry{
Time: timestamp,
Source: filepath.Base(w.path),
Level: jsonLog.Level,
Message: jsonLog.Message,
Fields: jsonLog.Fields,
}
}
// Plain text with enhanced level extraction
level := extractLogLevel(line)
return LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: level,
Message: line,
}
}
// FIXED: More comprehensive log level extraction
func extractLogLevel(line string) string {
patterns := []struct {
patterns []string
level string
}{
{[]string{"[ERROR]", "ERROR:", " ERROR ", "ERR:", "[ERR]", "FATAL:", "[FATAL]"}, "ERROR"},
{[]string{"[WARN]", "WARN:", " WARN ", "WARNING:", "[WARNING]"}, "WARN"},
{[]string{"[INFO]", "INFO:", " INFO ", "[INF]", "INF:"}, "INFO"},
{[]string{"[DEBUG]", "DEBUG:", " DEBUG ", "[DBG]", "DBG:"}, "DEBUG"},
{[]string{"[TRACE]", "TRACE:", " TRACE "}, "TRACE"},
}
upperLine := strings.ToUpper(line)
for _, group := range patterns {
for _, pattern := range group.patterns {
if strings.Contains(upperLine, pattern) {
return group.level
}
}
}
return ""
}
// FIXED: Convert glob patterns to regex
func globToRegex(glob string) string {
regex := regexp.QuoteMeta(glob)
regex = strings.ReplaceAll(regex, `\*`, `.*`)
regex = strings.ReplaceAll(regex, `\?`, `.`)
return "^" + regex + "$"
}
func (w *fileWatcher) close() {
w.stop()
}
func (w *fileWatcher) stop() {
w.mu.Lock()
w.stopped = true
w.mu.Unlock()
}
func (w *fileWatcher) isStopped() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.stopped
}

192
src/internal/stream/http.go Normal file
View File

@ -0,0 +1,192 @@
// FILE: src/internal/stream/http.go
package stream
import (
"bufio"
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/valyala/fasthttp"
"logwisp/src/internal/config"
"logwisp/src/internal/monitor"
)
type HTTPStreamer struct {
logChan chan monitor.LogEntry
config config.HTTPConfig
server *fasthttp.Server
activeClients atomic.Int32
mu sync.RWMutex
startTime time.Time
}
func NewHTTPStreamer(logChan chan monitor.LogEntry, cfg config.HTTPConfig) *HTTPStreamer {
return &HTTPStreamer{
logChan: logChan,
config: cfg,
startTime: time.Now(),
}
}
func (h *HTTPStreamer) Start() error {
h.server = &fasthttp.Server{
Handler: h.requestHandler,
DisableKeepalive: false,
StreamRequestBody: true,
Logger: nil, // Suppress fasthttp logs
}
addr := fmt.Sprintf(":%d", h.config.Port)
return h.server.ListenAndServe(addr)
}
func (h *HTTPStreamer) Stop() {
if h.server != nil {
h.server.Shutdown()
}
}
func (h *HTTPStreamer) requestHandler(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
switch path {
case "/stream":
h.handleStream(ctx)
case "/status":
h.handleStatus(ctx)
default:
ctx.SetStatusCode(fasthttp.StatusNotFound)
}
}
func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) {
// Set SSE headers
ctx.Response.Header.Set("Content-Type", "text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
ctx.Response.Header.Set("X-Accel-Buffering", "no")
h.activeClients.Add(1)
defer h.activeClients.Add(-1)
// Create subscription for this client
clientChan := make(chan monitor.LogEntry, h.config.BufferSize)
// Subscribe to monitor's broadcast
go func() {
for entry := range h.logChan {
select {
case clientChan <- entry:
default:
// Drop if client buffer full
}
}
close(clientChan)
}()
// Define the stream writer function
streamFunc := func(w *bufio.Writer) {
// Send initial connected event
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID)
w.Flush()
var ticker *time.Ticker
var tickerChan <-chan time.Time
if h.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
for {
select {
case entry, ok := <-clientChan:
if !ok {
return
}
data, err := json.Marshal(entry)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", data)
if err := w.Flush(); err != nil {
return
}
case <-tickerChan:
if heartbeat := h.formatHeartbeat(); heartbeat != "" {
fmt.Fprint(w, heartbeat)
if err := w.Flush(); err != nil {
return
}
}
}
}
}
ctx.SetBodyStreamWriter(streamFunc)
}
func (h *HTTPStreamer) formatHeartbeat() string {
if !h.config.Heartbeat.Enabled {
return ""
}
if h.config.Heartbeat.Format == "json" {
data := make(map[string]interface{})
data["type"] = "heartbeat"
if h.config.Heartbeat.IncludeTimestamp {
data["timestamp"] = time.Now().UTC().Format(time.RFC3339)
}
if h.config.Heartbeat.IncludeStats {
data["active_clients"] = h.activeClients.Load()
data["uptime_seconds"] = int(time.Since(h.startTime).Seconds())
}
jsonData, _ := json.Marshal(data)
return fmt.Sprintf("data: %s\n\n", jsonData)
}
// Default comment format
var parts []string
parts = append(parts, "heartbeat")
if h.config.Heartbeat.IncludeTimestamp {
parts = append(parts, time.Now().UTC().Format(time.RFC3339))
}
if h.config.Heartbeat.IncludeStats {
parts = append(parts, fmt.Sprintf("clients=%d", h.activeClients.Load()))
parts = append(parts, fmt.Sprintf("uptime=%ds", int(time.Since(h.startTime).Seconds())))
}
return fmt.Sprintf(": %s\n\n", strings.Join(parts, " "))
}
func (h *HTTPStreamer) handleStatus(ctx *fasthttp.RequestCtx) {
ctx.SetContentType("application/json")
status := map[string]interface{}{
"service": "LogWisp",
"version": "3.0.0",
"http_server": map[string]interface{}{
"port": h.config.Port,
"active_clients": h.activeClients.Load(),
"buffer_size": h.config.BufferSize,
},
}
data, _ := json.Marshal(status)
ctx.SetBody(data)
}

View File

@ -0,0 +1,11 @@
// FILE: src/internal/stream/noop_logger.go
package stream
// noopLogger implements gnet's Logger interface but discards everything
type noopLogger struct{}
func (n noopLogger) Debugf(format string, args ...any) {}
func (n noopLogger) Infof(format string, args ...any) {}
func (n noopLogger) Warnf(format string, args ...any) {}
func (n noopLogger) Errorf(format string, args ...any) {}
func (n noopLogger) Fatalf(format string, args ...any) {}

View File

@ -1,245 +0,0 @@
// File: logwisp/src/internal/stream/stream.go
package stream
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/monitor"
)
// Streamer handles Server-Sent Events streaming
type Streamer struct {
clients map[string]*clientConnection
register chan *clientConnection
unregister chan string
broadcast chan monitor.LogEntry
mu sync.RWMutex
bufferSize int
done chan struct{}
colorMode bool
wg sync.WaitGroup
// Metrics
totalDropped atomic.Int64
}
type clientConnection struct {
id string
channel chan monitor.LogEntry
lastActivity time.Time
dropped atomic.Int64 // Track per-client dropped messages
}
// New creates a new SSE streamer
func New(bufferSize int) *Streamer {
return NewWithOptions(bufferSize, false)
}
// NewWithOptions creates a new SSE streamer with options
func NewWithOptions(bufferSize int, colorMode bool) *Streamer {
s := &Streamer{
clients: make(map[string]*clientConnection),
register: make(chan *clientConnection),
unregister: make(chan string),
broadcast: make(chan monitor.LogEntry, bufferSize),
bufferSize: bufferSize,
done: make(chan struct{}),
colorMode: colorMode,
}
s.wg.Add(1)
go s.run()
return s
}
// run manages client connections - SIMPLIFIED: no forced disconnections
func (s *Streamer) run() {
defer s.wg.Done()
for {
select {
case c := <-s.register:
s.mu.Lock()
s.clients[c.id] = c
s.mu.Unlock()
case id := <-s.unregister:
s.mu.Lock()
if client, ok := s.clients[id]; ok {
close(client.channel)
delete(s.clients, id)
}
s.mu.Unlock()
case entry := <-s.broadcast:
s.mu.RLock()
now := time.Now()
for _, client := range s.clients {
select {
case client.channel <- entry:
// Successfully sent
client.lastActivity = now
client.dropped.Store(0) // Reset dropped counter on success
default:
// Buffer full - skip this message for this client
// Don't disconnect, just track dropped messages
dropped := client.dropped.Add(1)
s.totalDropped.Add(1)
// Log significant drop milestones for monitoring
if dropped == 100 || dropped == 1000 || dropped%10000 == 0 {
// Could add logging here if needed
}
}
}
s.mu.RUnlock()
case <-s.done:
s.mu.Lock()
for _, client := range s.clients {
close(client.channel)
}
s.clients = make(map[string]*clientConnection)
s.mu.Unlock()
return
}
}
}
// Publish sends a log entry to all connected clients
func (s *Streamer) Publish(entry monitor.LogEntry) {
select {
case s.broadcast <- entry:
// Sent to broadcast channel
default:
// Broadcast buffer full - drop the message globally
s.totalDropped.Add(1)
}
}
// ServeHTTP implements http.Handler for SSE - SIMPLIFIED
func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
// SECURITY: Prevent XSS
w.Header().Set("X-Content-Type-Options", "nosniff")
// Create client
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
ch := make(chan monitor.LogEntry, s.bufferSize)
client := &clientConnection{
id: clientID,
channel: ch,
lastActivity: time.Now(),
}
// Register client
s.register <- client
defer func() {
s.unregister <- clientID
}()
// Send initial connection event
fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\",\"buffer_size\":%d}\n\n",
clientID, s.bufferSize)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Create ticker for heartbeat - keeps connection alive through proxies
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Stream events until client disconnects
for {
select {
case <-r.Context().Done():
// Client disconnected
return
case entry, ok := <-ch:
if !ok {
// Channel closed
return
}
// Process entry for color if needed
if s.colorMode {
entry = s.processColorEntry(entry)
}
data, err := json.Marshal(entry)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", data)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C:
// Send heartbeat as SSE comment
fmt.Fprintf(w, ": heartbeat %s\n\n", time.Now().UTC().Format(time.RFC3339))
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
}
}
// Stop gracefully shuts down the streamer
func (s *Streamer) Stop() {
close(s.done)
s.wg.Wait()
close(s.register)
close(s.unregister)
close(s.broadcast)
}
// processColorEntry preserves ANSI codes in JSON
func (s *Streamer) processColorEntry(entry monitor.LogEntry) monitor.LogEntry {
return entry
}
// Stats returns current streamer statistics
func (s *Streamer) Stats() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
stats := map[string]interface{}{
"active_clients": len(s.clients),
"buffer_size": s.bufferSize,
"color_mode": s.colorMode,
"total_dropped": s.totalDropped.Load(),
}
// Include per-client dropped counts if any are significant
var clientsWithDrops []map[string]interface{}
for id, client := range s.clients {
dropped := client.dropped.Load()
if dropped > 0 {
clientsWithDrops = append(clientsWithDrops, map[string]interface{}{
"id": id,
"dropped": dropped,
})
}
}
if len(clientsWithDrops) > 0 {
stats["clients_with_drops"] = clientsWithDrops
}
return stats
}

144
src/internal/stream/tcp.go Normal file
View File

@ -0,0 +1,144 @@
// FILE: src/internal/stream/tcp.go
package stream
import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/gnet/v2"
"logwisp/src/internal/config"
"logwisp/src/internal/monitor"
)
type TCPStreamer struct {
logChan chan monitor.LogEntry
config config.TCPConfig
server *tcpServer
done chan struct{}
activeConns atomic.Int32
startTime time.Time
}
type tcpServer struct {
gnet.BuiltinEventEngine
streamer *TCPStreamer
connections sync.Map
}
func NewTCPStreamer(logChan chan monitor.LogEntry, cfg config.TCPConfig) *TCPStreamer {
return &TCPStreamer{
logChan: logChan,
config: cfg,
done: make(chan struct{}),
startTime: time.Now(),
}
}
func (t *TCPStreamer) Start() error {
t.server = &tcpServer{streamer: t}
// Start log broadcast loop
go t.broadcastLoop()
// Configure gnet with no-op logger
addr := fmt.Sprintf("tcp://:%d", t.config.Port)
err := gnet.Run(t.server, addr,
gnet.WithLogger(noopLogger{}), // No-op logger: discard everything
gnet.WithMulticore(true),
gnet.WithReusePort(true),
)
return err
}
func (t *TCPStreamer) Stop() {
close(t.done)
// No engine to stop with gnet v2
}
func (t *TCPStreamer) broadcastLoop() {
var ticker *time.Ticker
var tickerChan <-chan time.Time
if t.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(t.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
for {
select {
case entry := <-t.logChan:
data, err := json.Marshal(entry)
if err != nil {
continue
}
data = append(data, '\n')
t.server.connections.Range(func(key, value interface{}) bool {
conn := key.(gnet.Conn)
conn.AsyncWrite(data, nil)
return true
})
case <-tickerChan:
if heartbeat := t.formatHeartbeat(); heartbeat != nil {
t.server.connections.Range(func(key, value interface{}) bool {
conn := key.(gnet.Conn)
conn.AsyncWrite(heartbeat, nil)
return true
})
}
case <-t.done:
return
}
}
}
func (t *TCPStreamer) formatHeartbeat() []byte {
if !t.config.Heartbeat.Enabled {
return nil
}
data := make(map[string]interface{})
data["type"] = "heartbeat"
if t.config.Heartbeat.IncludeTimestamp {
data["time"] = time.Now().UTC().Format(time.RFC3339Nano)
}
if t.config.Heartbeat.IncludeStats {
data["active_connections"] = t.activeConns.Load()
data["uptime_seconds"] = int(time.Since(t.startTime).Seconds())
}
jsonData, _ := json.Marshal(data)
return append(jsonData, '\n')
}
func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action {
return gnet.None
}
func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
s.connections.Store(c, struct{}{})
s.streamer.activeConns.Add(1)
return nil, gnet.None
}
func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
s.connections.Delete(c)
s.streamer.activeConns.Add(-1)
return gnet.None
}
func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action {
// We don't expect input from clients, just discard
c.Discard(-1)
return gnet.None
}