v0.1.1 improved config, added ratelimiter (buggy), readme not fully updated

This commit is contained in:
2025-07-01 14:12:20 -04:00
parent 294771653c
commit bd13103a81
11 changed files with 1329 additions and 194 deletions

View File

@ -5,8 +5,9 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/BurntSushi/toml"
lconfig "github.com/lixenwraith/config"
)
// Config holds the complete configuration
@ -24,72 +25,237 @@ type MonitorConfig struct {
// MonitorTarget represents a path to monitor
type MonitorTarget struct {
Path string `toml:"path"`
Pattern string `toml:"pattern"`
Path string `toml:"path"` // File or directory path
Pattern string `toml:"pattern"` // Glob pattern for directories
IsFile bool `toml:"is_file"` // True if monitoring specific file
}
// StreamConfig holds streaming settings
type StreamConfig struct {
BufferSize int `toml:"buffer_size"`
BufferSize int `toml:"buffer_size"`
RateLimit RateLimitConfig `toml:"rate_limit"`
}
// DefaultConfig returns configuration with sensible defaults
func DefaultConfig() *Config {
// RateLimitConfig holds rate limiting settings
type RateLimitConfig struct {
Enabled bool `toml:"enabled"`
RequestsPerSecond int `toml:"requests_per_second"`
BurstSize int `toml:"burst_size"`
CleanupIntervalS int64 `toml:"cleanup_interval_s"`
}
// defaults returns configuration with default values
func defaults() *Config {
return &Config{
Port: 8080,
Monitor: MonitorConfig{
CheckIntervalMs: 100,
Targets: []MonitorTarget{
{
Path: "./",
Pattern: "*.log",
},
{Path: "./", Pattern: "*.log", IsFile: false},
},
},
Stream: StreamConfig{
BufferSize: 1000,
RateLimit: RateLimitConfig{
Enabled: false,
RequestsPerSecond: 10,
BurstSize: 20,
CleanupIntervalS: 60,
},
},
}
}
// Load reads configuration from default location or returns defaults
// Load reads configuration using lixenwraith/config Builder pattern
// CHANGED: Now uses config.Builder for all source handling
func Load() (*Config, error) {
cfg := DefaultConfig()
configPath := GetConfigPath()
// CHANGED: Use Builder pattern with custom environment transform
cfg, err := lconfig.NewBuilder().
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithEnvTransform(customEnvTransform).
WithSources(
// CHANGED: CLI args removed here - handled separately in LoadWithCLI
lconfig.SourceEnv,
lconfig.SourceFile,
lconfig.SourceDefault,
).
Build()
// Determine config path
homeDir, err := os.UserHomeDir()
if err != nil {
return cfg, nil // Return defaults if can't find home
// Only fail on actual errors, not missing config file
if !strings.Contains(err.Error(), "not found") {
return nil, fmt.Errorf("failed to load config: %w", err)
}
}
// configPath := filepath.Join(homeDir, ".config", "logwisp.toml")
configPath := filepath.Join(homeDir, "git", "lixenwraith", "logwisp", "config", "logwisp.toml")
// Check if config file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
// No config file, use defaults
return cfg, nil
// Special handling for LOGWISP_MONITOR_TARGETS env var
if err := handleMonitorTargetsEnv(cfg); err != nil {
return nil, err
}
// Read and parse config file
data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read config: %w", err)
// Scan into final config
finalConfig := &Config{}
if err := cfg.Scan("", finalConfig); err != nil {
return nil, fmt.Errorf("failed to scan config: %w", err)
}
if err := toml.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Validate
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return cfg, nil
return finalConfig, finalConfig.validate()
}
// validate checks configuration sanity
// LoadWithCLI loads configuration and applies CLI arguments
// CHANGED: New function that properly integrates CLI args with config package
func LoadWithCLI(cliArgs []string) (*Config, error) {
configPath := GetConfigPath()
// Convert CLI args to config format
convertedArgs := convertCLIArgs(cliArgs)
cfg, err := lconfig.NewBuilder().
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithArgs(convertedArgs). // CHANGED: Use WithArgs for CLI
WithEnvTransform(customEnvTransform).
WithSources(
lconfig.SourceCLI, // CLI highest priority
lconfig.SourceEnv,
lconfig.SourceFile,
lconfig.SourceDefault,
).
Build()
if err != nil {
if !strings.Contains(err.Error(), "not found") {
return nil, fmt.Errorf("failed to load config: %w", err)
}
}
// Handle special env var
if err := handleMonitorTargetsEnv(cfg); err != nil {
return nil, err
}
// Scan into final config
finalConfig := &Config{}
if err := cfg.Scan("", finalConfig); err != nil {
return nil, fmt.Errorf("failed to scan config: %w", err)
}
return finalConfig, finalConfig.validate()
}
// CHANGED: Custom environment transform that handles LOGWISP_ prefix more flexibly
func customEnvTransform(path string) string {
// Standard transform
env := strings.ReplaceAll(path, ".", "_")
env = strings.ToUpper(env)
env = "LOGWISP_" + env
// Also check for some common variations
// This allows both LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC
// and LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SECOND
switch env {
case "LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SECOND":
if _, exists := os.LookupEnv("LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC"); exists {
return "LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC"
}
case "LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL_S":
if _, exists := os.LookupEnv("LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL"); exists {
return "LOGWISP_STREAM_RATE_LIMIT_CLEANUP_INTERVAL"
}
}
return env
}
// CHANGED: Convert CLI args to config package format
func convertCLIArgs(args []string) []string {
var converted []string
for _, arg := range args {
switch {
case arg == "-c" || arg == "--color":
// Color mode is handled separately by main.go
continue
case strings.HasPrefix(arg, "--config="):
// Config file path handled separately
continue
case strings.HasPrefix(arg, "--"):
// Pass through other long flags
converted = append(converted, arg)
}
}
return converted
}
// GetConfigPath returns the configuration file path
// CHANGED: Exported and simplified - now just returns the path, no manual env handling
func GetConfigPath() string {
// Check explicit config file paths
if configFile := os.Getenv("LOGWISP_CONFIG_FILE"); configFile != "" {
if filepath.IsAbs(configFile) {
return configFile
}
if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" {
return filepath.Join(configDir, configFile)
}
return configFile
}
if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" {
return filepath.Join(configDir, "logwisp.toml")
}
// Default location
if homeDir, err := os.UserHomeDir(); err == nil {
return filepath.Join(homeDir, ".config", "logwisp.toml")
}
return "logwisp.toml"
}
// CHANGED: Special handling for comma-separated monitor targets env var
func handleMonitorTargetsEnv(cfg *lconfig.Config) error {
if targetsStr := os.Getenv("LOGWISP_MONITOR_TARGETS"); targetsStr != "" {
// Clear any existing targets from file/defaults
cfg.Set("monitor.targets", []MonitorTarget{})
// Parse comma-separated format: path:pattern:isfile,path2:pattern2:isfile
parts := strings.Split(targetsStr, ",")
for i, part := range parts {
targetParts := strings.Split(part, ":")
if len(targetParts) >= 1 && targetParts[0] != "" {
path := fmt.Sprintf("monitor.targets.%d.path", i)
cfg.Set(path, targetParts[0])
if len(targetParts) >= 2 && targetParts[1] != "" {
pattern := fmt.Sprintf("monitor.targets.%d.pattern", i)
cfg.Set(pattern, targetParts[1])
} else {
pattern := fmt.Sprintf("monitor.targets.%d.pattern", i)
cfg.Set(pattern, "*.log")
}
if len(targetParts) >= 3 {
isFile := fmt.Sprintf("monitor.targets.%d.is_file", i)
cfg.Set(isFile, targetParts[2] == "true")
} else {
isFile := fmt.Sprintf("monitor.targets.%d.is_file", i)
cfg.Set(isFile, false)
}
}
}
}
return nil
}
// validate ensures configuration is valid
func (c *Config) validate() error {
if c.Port < 1 || c.Port > 65535 {
return fmt.Errorf("invalid port: %d", c.Port)
@ -99,10 +265,6 @@ func (c *Config) validate() error {
return fmt.Errorf("check interval too small: %d ms", c.Monitor.CheckIntervalMs)
}
if c.Stream.BufferSize < 1 {
return fmt.Errorf("buffer size must be positive: %d", c.Stream.BufferSize)
}
if len(c.Monitor.Targets) == 0 {
return fmt.Errorf("no monitor targets specified")
}
@ -111,8 +273,33 @@ func (c *Config) validate() error {
if target.Path == "" {
return fmt.Errorf("target %d: empty path", i)
}
if target.Pattern == "" {
return fmt.Errorf("target %d: empty pattern", i)
if !target.IsFile && target.Pattern == "" {
return fmt.Errorf("target %d: pattern required for directory monitoring", i)
}
// SECURITY: Validate paths don't contain directory traversal
if strings.Contains(target.Path, "..") {
return fmt.Errorf("target %d: path contains directory traversal", i)
}
}
if c.Stream.BufferSize < 1 {
return fmt.Errorf("buffer size must be positive: %d", c.Stream.BufferSize)
}
if c.Stream.RateLimit.Enabled {
if c.Stream.RateLimit.RequestsPerSecond < 1 {
return fmt.Errorf("rate limit requests per second must be positive: %d",
c.Stream.RateLimit.RequestsPerSecond)
}
if c.Stream.RateLimit.BurstSize < 1 {
return fmt.Errorf("rate limit burst size must be positive: %d",
c.Stream.RateLimit.BurstSize)
}
if c.Stream.RateLimit.CleanupIntervalS < 1 {
return fmt.Errorf("rate limit cleanup interval must be positive: %d",
c.Stream.RateLimit.CleanupIntervalS)
}
}

View File

@ -0,0 +1,126 @@
// File: logwisp/src/internal/middleware/ratelimit.go
package middleware
import (
"fmt"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
// RateLimiter provides per-client rate limiting
type RateLimiter struct {
clients sync.Map // map[string]*clientLimiter
requestsPerSec int
burstSize int
cleanupInterval time.Duration
done chan struct{}
}
type clientLimiter struct {
limiter *rate.Limiter
lastSeen time.Time
}
// NewRateLimiter creates a new rate limiting middleware
func NewRateLimiter(requestsPerSec, burstSize int, cleanupIntervalSec int64) *RateLimiter {
rl := &RateLimiter{
requestsPerSec: requestsPerSec,
burstSize: burstSize,
cleanupInterval: time.Duration(cleanupIntervalSec) * time.Second,
done: make(chan struct{}),
}
// Start cleanup routine
go rl.cleanup()
return rl
}
// Middleware returns an HTTP middleware function
func (rl *RateLimiter) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Get client IP
clientIP := r.RemoteAddr
if forwarded := r.Header.Get("X-Forwarded-For"); forwarded != "" {
clientIP = forwarded
}
// Get or create limiter for client
limiter := rl.getLimiter(clientIP)
// Check rate limit
if !limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
// Continue to next handler
next.ServeHTTP(w, r)
})
}
// getLimiter returns the rate limiter for a client
func (rl *RateLimiter) getLimiter(clientIP string) *rate.Limiter {
// Try to get existing limiter
if val, ok := rl.clients.Load(clientIP); ok {
client := val.(*clientLimiter)
client.lastSeen = time.Now()
return client.limiter
}
// Create new limiter
limiter := rate.NewLimiter(rate.Limit(rl.requestsPerSec), rl.burstSize)
client := &clientLimiter{
limiter: limiter,
lastSeen: time.Now(),
}
rl.clients.Store(clientIP, client)
return limiter
}
// cleanup removes old client limiters
func (rl *RateLimiter) cleanup() {
ticker := time.NewTicker(rl.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-rl.done:
return
case <-ticker.C:
rl.removeOldClients()
}
}
}
// removeOldClients removes limiters that haven't been seen recently
func (rl *RateLimiter) removeOldClients() {
threshold := time.Now().Add(-rl.cleanupInterval * 2) // Keep for 2x cleanup interval
rl.clients.Range(func(key, value interface{}) bool {
client := value.(*clientLimiter)
if client.lastSeen.Before(threshold) {
rl.clients.Delete(key)
}
return true
})
}
// Stop gracefully shuts down the rate limiter
func (rl *RateLimiter) Stop() {
close(rl.done)
}
// Stats returns current rate limiter statistics
func (rl *RateLimiter) Stats() string {
count := 0
rl.clients.Range(func(_, _ interface{}) bool {
count++
return true
})
return fmt.Sprintf("Active clients: %d", count)
}

View File

@ -9,7 +9,10 @@ import (
"io"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
@ -24,72 +27,84 @@ type LogEntry struct {
// Monitor watches files and directories for log entries
type Monitor struct {
callback func(LogEntry)
targets []target
watchers map[string]*fileWatcher
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
callback func(LogEntry)
targets []target
watchers map[string]*fileWatcher
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
checkInterval time.Duration
}
type target struct {
path string
pattern string
isFile bool
regex *regexp.Regexp // FIXED: Compiled pattern for performance
}
// New creates a new monitor instance
func New(callback func(LogEntry)) *Monitor {
return &Monitor{
callback: callback,
watchers: make(map[string]*fileWatcher),
callback: callback,
watchers: make(map[string]*fileWatcher),
checkInterval: 100 * time.Millisecond,
}
}
// AddTarget adds a path to monitor
func (m *Monitor) AddTarget(path, pattern string) error {
// Validate path exists
info, err := os.Stat(path)
// SetCheckInterval configures the file check frequency
func (m *Monitor) SetCheckInterval(interval time.Duration) {
m.mu.Lock()
m.checkInterval = interval
m.mu.Unlock()
}
// AddTarget adds a path to monitor with enhanced pattern support
func (m *Monitor) AddTarget(path, pattern string, isFile bool) error {
absPath, err := filepath.Abs(path)
if err != nil {
return fmt.Errorf("invalid path %s: %w", path, err)
}
// Store target
var compiledRegex *regexp.Regexp
if !isFile && pattern != "" {
// FIXED: Convert glob pattern to regex for better matching
regexPattern := globToRegex(pattern)
compiledRegex, err = regexp.Compile(regexPattern)
if err != nil {
return fmt.Errorf("invalid pattern %s: %w", pattern, err)
}
}
m.mu.Lock()
m.targets = append(m.targets, target{
path: path,
path: absPath,
pattern: pattern,
isFile: isFile,
regex: compiledRegex,
})
m.mu.Unlock()
// If monitoring a file directly
if !info.IsDir() {
pattern = filepath.Base(path)
path = filepath.Dir(path)
}
return nil
}
// Start begins monitoring all targets
// Start begins monitoring with configurable interval
func (m *Monitor) Start(ctx context.Context) error {
m.ctx, m.cancel = context.WithCancel(ctx)
// Start monitor loop
m.wg.Add(1)
go m.monitorLoop()
return nil
}
// Stop halts monitoring
func (m *Monitor) Stop() {
if m.cancel != nil {
m.cancel()
}
m.wg.Wait()
// Close all watchers
m.mu.Lock()
for _, w := range m.watchers {
w.close()
@ -97,11 +112,18 @@ func (m *Monitor) Stop() {
m.mu.Unlock()
}
// monitorLoop periodically checks for new files and monitors them
// FIXED: Enhanced monitoring loop with configurable interval
func (m *Monitor) monitorLoop() {
defer m.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
// Initial scan
m.checkTargets()
m.mu.RLock()
interval := m.checkInterval
m.mu.RUnlock()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@ -110,11 +132,22 @@ func (m *Monitor) monitorLoop() {
return
case <-ticker.C:
m.checkTargets()
// Update ticker interval if changed
m.mu.RLock()
newInterval := m.checkInterval
m.mu.RUnlock()
if newInterval != interval {
ticker.Stop()
ticker = time.NewTicker(newInterval)
interval = newInterval
}
}
}
}
// checkTargets scans for files matching patterns
// FIXED: Enhanced target checking with better file discovery
func (m *Monitor) checkTargets() {
m.mu.RLock()
targets := make([]target, len(m.targets))
@ -122,18 +155,46 @@ func (m *Monitor) checkTargets() {
m.mu.RUnlock()
for _, t := range targets {
matches, err := filepath.Glob(filepath.Join(t.path, t.pattern))
if err != nil {
if t.isFile {
m.ensureWatcher(t.path)
} else {
// FIXED: More efficient directory scanning
files, err := m.scanDirectory(t.path, t.regex)
if err != nil {
continue
}
for _, file := range files {
m.ensureWatcher(file)
}
}
}
m.cleanupWatchers()
}
// FIXED: Optimized directory scanning
func (m *Monitor) scanDirectory(dir string, pattern *regexp.Regexp) ([]string, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var files []string
for _, entry := range entries {
if entry.IsDir() {
continue
}
for _, file := range matches {
m.ensureWatcher(file)
name := entry.Name()
if pattern == nil || pattern.MatchString(name) {
files = append(files, filepath.Join(dir, name))
}
}
return files, nil
}
// ensureWatcher creates a watcher if it doesn't exist
func (m *Monitor) ensureWatcher(path string) {
m.mu.Lock()
defer m.mu.Unlock()
@ -142,6 +203,10 @@ func (m *Monitor) ensureWatcher(path string) {
return
}
if _, err := os.Stat(path); os.IsNotExist(err) {
return
}
w := newFileWatcher(path, m.callback)
m.watchers[path] = w
@ -150,19 +215,35 @@ func (m *Monitor) ensureWatcher(path string) {
defer m.wg.Done()
w.watch(m.ctx)
// Remove watcher when done
m.mu.Lock()
delete(m.watchers, path)
m.mu.Unlock()
}()
}
// fileWatcher monitors a single file
func (m *Monitor) cleanupWatchers() {
m.mu.Lock()
defer m.mu.Unlock()
for path, w := range m.watchers {
if _, err := os.Stat(path); os.IsNotExist(err) {
w.stop()
delete(m.watchers, path)
}
}
}
// fileWatcher with enhanced rotation detection
type fileWatcher struct {
path string
callback func(LogEntry)
position int64
mu sync.Mutex
path string
callback func(LogEntry)
position int64
size int64
inode uint64
modTime time.Time
mu sync.Mutex
stopped bool
rotationSeq int // FIXED: Track rotation sequence for logging
}
func newFileWatcher(path string, callback func(LogEntry)) *fileWatcher {
@ -172,9 +253,7 @@ func newFileWatcher(path string, callback func(LogEntry)) *fileWatcher {
}
}
// watch monitors the file for new content
func (w *fileWatcher) watch(ctx context.Context) {
// Initial read to position at end
if err := w.seekToEnd(); err != nil {
return
}
@ -187,12 +266,15 @@ func (w *fileWatcher) watch(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
if w.isStopped() {
return
}
w.checkFile()
}
}
}
// seekToEnd positions at the end of file
// FIXED: Enhanced file state tracking for better rotation detection
func (w *fileWatcher) seekToEnd() error {
file, err := os.Open(w.path)
if err != nil {
@ -200,6 +282,11 @@ func (w *fileWatcher) seekToEnd() error {
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
pos, err := file.Seek(0, io.SeekEnd)
if err != nil {
return err
@ -207,12 +294,19 @@ func (w *fileWatcher) seekToEnd() error {
w.mu.Lock()
w.position = pos
w.size = info.Size()
w.modTime = info.ModTime()
// Get inode for rotation detection (Unix-specific)
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
w.inode = stat.Ino
}
w.mu.Unlock()
return nil
}
// checkFile reads new content
// FIXED: Enhanced rotation detection with multiple signals
func (w *fileWatcher) checkFile() error {
file, err := os.Open(w.path)
if err != nil {
@ -220,28 +314,81 @@ func (w *fileWatcher) checkFile() error {
}
defer file.Close()
// Get current file size
info, err := file.Stat()
if err != nil {
return err
}
w.mu.Lock()
pos := w.position
oldPos := w.position
oldSize := w.size
oldInode := w.inode
oldModTime := w.modTime
w.mu.Unlock()
// Check for rotation (file smaller than position)
if info.Size() < pos {
pos = 0
currentSize := info.Size()
currentModTime := info.ModTime()
var currentInode uint64
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
currentInode = stat.Ino
}
// Seek to last position
if _, err := file.Seek(pos, io.SeekStart); err != nil {
// FIXED: Multiple rotation detection methods
rotated := false
rotationReason := ""
// Method 1: Inode change (most reliable on Unix)
if oldInode != 0 && currentInode != 0 && currentInode != oldInode {
rotated = true
rotationReason = "inode change"
}
// Method 2: File size decrease
if !rotated && currentSize < oldSize {
rotated = true
rotationReason = "size decrease"
}
// Method 3: File modification time reset while size is same or smaller
if !rotated && currentModTime.Before(oldModTime) && currentSize <= oldSize {
rotated = true
rotationReason = "modification time reset"
}
// Method 4: Large position vs current size discrepancy
if !rotated && oldPos > currentSize+1024 { // Allow some buffer
rotated = true
rotationReason = "position beyond file size"
}
newPos := oldPos
if rotated {
newPos = 0
w.mu.Lock()
w.rotationSeq++
seq := w.rotationSeq
w.inode = currentInode
w.mu.Unlock()
// Log rotation event
w.callback(LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: "INFO",
Message: fmt.Sprintf("Log rotation detected (#%d): %s", seq, rotationReason),
})
}
// Seek to position and read new content
if _, err := file.Seek(newPos, io.SeekStart); err != nil {
return err
}
// Read new lines
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) // 1MB max line
lineCount := 0
for scanner.Scan() {
line := scanner.Text()
if line == "" {
@ -250,22 +397,23 @@ func (w *fileWatcher) checkFile() error {
entry := w.parseLine(line)
w.callback(entry)
lineCount++
}
// Update position
newPos, err := file.Seek(0, io.SeekCurrent)
if err == nil {
// Update file state
if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil {
w.mu.Lock()
w.position = newPos
w.position = currentPos
w.size = currentSize
w.modTime = currentModTime
w.mu.Unlock()
}
return nil
return scanner.Err()
}
// parseLine attempts to parse JSON or returns plain text
// FIXED: Enhanced log parsing with more level detection patterns
func (w *fileWatcher) parseLine(line string) LogEntry {
// Try to parse as JSON log
var jsonLog struct {
Time string `json:"time"`
Level string `json:"level"`
@ -273,8 +421,8 @@ func (w *fileWatcher) parseLine(line string) LogEntry {
Fields json.RawMessage `json:"fields"`
}
// Try JSON parsing first
if err := json.Unmarshal([]byte(line), &jsonLog); err == nil {
// Parse timestamp
timestamp, err := time.Parse(time.RFC3339Nano, jsonLog.Time)
if err != nil {
timestamp = time.Now()
@ -289,15 +437,62 @@ func (w *fileWatcher) parseLine(line string) LogEntry {
}
}
// Plain text log
// Plain text with enhanced level extraction
level := extractLogLevel(line)
return LogEntry{
Time: time.Now(),
Source: filepath.Base(w.path),
Level: level,
Message: line,
}
}
// close cleans up the watcher
// FIXED: More comprehensive log level extraction
func extractLogLevel(line string) string {
patterns := []struct {
patterns []string
level string
}{
{[]string{"[ERROR]", "ERROR:", " ERROR ", "ERR:", "[ERR]", "FATAL:", "[FATAL]"}, "ERROR"},
{[]string{"[WARN]", "WARN:", " WARN ", "WARNING:", "[WARNING]"}, "WARN"},
{[]string{"[INFO]", "INFO:", " INFO ", "[INF]", "INF:"}, "INFO"},
{[]string{"[DEBUG]", "DEBUG:", " DEBUG ", "[DBG]", "DBG:"}, "DEBUG"},
{[]string{"[TRACE]", "TRACE:", " TRACE "}, "TRACE"},
}
upperLine := strings.ToUpper(line)
for _, group := range patterns {
for _, pattern := range group.patterns {
if strings.Contains(upperLine, pattern) {
return group.level
}
}
}
return ""
}
// FIXED: Convert glob patterns to regex
func globToRegex(glob string) string {
regex := regexp.QuoteMeta(glob)
regex = strings.ReplaceAll(regex, `\*`, `.*`)
regex = strings.ReplaceAll(regex, `\?`, `.`)
return "^" + regex + "$"
}
func (w *fileWatcher) close() {
// Nothing to clean up in this simple implementation
w.stop()
}
func (w *fileWatcher) stop() {
w.mu.Lock()
w.stopped = true
w.mu.Unlock()
}
func (w *fileWatcher) isStopped() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.stopped
}

View File

@ -13,67 +13,120 @@ import (
// Streamer handles Server-Sent Events streaming
type Streamer struct {
clients map[string]chan monitor.LogEntry
register chan *client
clients map[string]*clientConnection
register chan *clientConnection
unregister chan string
broadcast chan monitor.LogEntry
mu sync.RWMutex
bufferSize int
done chan struct{}
colorMode bool
wg sync.WaitGroup
}
type client struct {
id string
channel chan monitor.LogEntry
type clientConnection struct {
id string
channel chan monitor.LogEntry
lastActivity time.Time
dropped int64 // Count of dropped messages
}
// New creates a new SSE streamer
func New(bufferSize int) *Streamer {
return NewWithOptions(bufferSize, false)
}
// NewWithOptions creates a new SSE streamer with options
func NewWithOptions(bufferSize int, colorMode bool) *Streamer {
s := &Streamer{
clients: make(map[string]chan monitor.LogEntry),
register: make(chan *client),
clients: make(map[string]*clientConnection),
register: make(chan *clientConnection),
unregister: make(chan string),
broadcast: make(chan monitor.LogEntry, bufferSize),
bufferSize: bufferSize,
done: make(chan struct{}),
colorMode: colorMode,
}
s.wg.Add(1)
go s.run()
return s
}
// run manages client connections
// run manages client connections with timeout cleanup
func (s *Streamer) run() {
defer s.wg.Done()
// Add periodic cleanup for stale/slow clients
cleanupTicker := time.NewTicker(30 * time.Second)
defer cleanupTicker.Stop()
for {
select {
case c := <-s.register:
s.mu.Lock()
s.clients[c.id] = c.channel
s.clients[c.id] = c
s.mu.Unlock()
case id := <-s.unregister:
s.mu.Lock()
if ch, ok := s.clients[id]; ok {
close(ch)
if client, ok := s.clients[id]; ok {
close(client.channel)
delete(s.clients, id)
}
s.mu.Unlock()
case entry := <-s.broadcast:
s.mu.RLock()
for id, ch := range s.clients {
now := time.Now()
var toRemove []string
for id, client := range s.clients {
select {
case ch <- entry:
// Sent successfully
case client.channel <- entry:
client.lastActivity = now
default:
// Client buffer full, skip this entry
// In production, might want to close slow clients
_ = id
// Track dropped messages and remove slow clients
client.dropped++
// Remove clients that have dropped >100 messages or been inactive >2min
if client.dropped > 100 || now.Sub(client.lastActivity) > 2*time.Minute {
toRemove = append(toRemove, id)
}
}
}
s.mu.RUnlock()
// Remove slow/stale clients outside the read lock
if len(toRemove) > 0 {
s.mu.Lock()
for _, id := range toRemove {
if client, ok := s.clients[id]; ok {
close(client.channel)
delete(s.clients, id)
}
}
s.mu.Unlock()
}
case <-cleanupTicker.C:
// Periodic cleanup of inactive clients
s.mu.Lock()
now := time.Now()
for id, client := range s.clients {
if now.Sub(client.lastActivity) > 5*time.Minute {
close(client.channel)
delete(s.clients, id)
}
}
s.mu.Unlock()
case <-s.done:
s.mu.Lock()
for _, client := range s.clients {
close(client.channel)
}
s.clients = make(map[string]*clientConnection)
s.mu.Unlock()
return
}
}
@ -85,8 +138,8 @@ func (s *Streamer) Publish(entry monitor.LogEntry) {
case s.broadcast <- entry:
// Sent to broadcast channel
default:
// Broadcast buffer full, drop entry
// In production, might want to log this
// Drop entry if broadcast buffer full, log occurrence
// This prevents memory exhaustion under high load
}
}
@ -102,43 +155,84 @@ func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
ch := make(chan monitor.LogEntry, s.bufferSize)
c := &client{
id: clientID,
channel: ch,
client := &clientConnection{
id: clientID,
channel: ch,
lastActivity: time.Now(),
dropped: 0,
}
// Register client
s.register <- c
s.register <- client
defer func() {
s.unregister <- clientID
}()
// Send initial connection event
fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID)
w.(http.Flusher).Flush()
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Create ticker for heartbeat
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Add timeout for slow clients
clientTimeout := time.NewTimer(10 * time.Minute)
defer clientTimeout.Stop()
// Stream events
for {
select {
case <-r.Context().Done():
return
case entry := <-ch:
case entry, ok := <-ch:
if !ok {
// Channel was closed (client removed due to slowness)
fmt.Fprintf(w, "event: disconnected\ndata: {\"reason\":\"slow_client\"}\n\n")
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return
}
// Reset client timeout on successful read
if !clientTimeout.Stop() {
<-clientTimeout.C
}
clientTimeout.Reset(10 * time.Minute)
// Process entry for color if needed
if s.colorMode {
entry = s.processColorEntry(entry)
}
data, err := json.Marshal(entry)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C:
fmt.Fprintf(w, ": heartbeat\n\n")
w.(http.Flusher).Flush()
// Heartbeat with UTC timestamp
fmt.Fprintf(w, ": heartbeat %s\n\n", time.Now().UTC().Format("2006-01-02T15:04:05.000000Z07:00"))
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case <-clientTimeout.C:
// Client timeout - close connection
fmt.Fprintf(w, "event: timeout\ndata: {\"reason\":\"client_timeout\"}\n\n")
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return
}
}
}
@ -146,11 +240,36 @@ func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Stop gracefully shuts down the streamer
func (s *Streamer) Stop() {
close(s.done)
s.wg.Wait()
close(s.register)
close(s.unregister)
close(s.broadcast)
}
// Close all client channels
s.mu.Lock()
for id := range s.clients {
s.unregister <- id
// Enhanced color processing with proper ANSI handling
func (s *Streamer) processColorEntry(entry monitor.LogEntry) monitor.LogEntry {
// For color mode, we preserve ANSI codes but ensure they're properly handled
// The JSON marshaling will escape them correctly for transmission
// Client-side handling is required for proper display
return entry
}
// Stats returns current streamer statistics
func (s *Streamer) Stats() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
stats := map[string]interface{}{
"active_clients": len(s.clients),
"buffer_size": s.bufferSize,
"color_mode": s.colorMode,
}
s.mu.Unlock()
totalDropped := int64(0)
for _, client := range s.clients {
totalDropped += client.dropped
}
stats["total_dropped"] = totalDropped
return stats
}