v0.1.2 update readme and config, failed attempt to fix slow client

This commit is contained in:
2025-07-01 16:34:19 -04:00
parent bd13103a81
commit a3450a9589
6 changed files with 386 additions and 182 deletions

View File

@ -21,9 +21,10 @@ import (
)
func main() {
// CHANGED: Parse flags manually without init()
// Parse flags manually without init()
var colorMode bool
flag.BoolVar(&colorMode, "c", false, "Enable color pass-through for escape codes in logs")
flag.BoolVar(&colorMode, "color", false, "Enable color pass-through for escape codes in logs")
// Additional CLI flags that override config
var (
@ -98,7 +99,6 @@ func main() {
var wg sync.WaitGroup
// Create components
// colorMode is now separate from config
streamer := stream.NewWithOptions(cfg.Stream.BufferSize, colorMode)
mon := monitor.New(streamer.Publish)
@ -145,7 +145,7 @@ func main() {
status := map[string]interface{}{
"service": "LogWisp",
"version": "2.0.0", // CHANGED: Version bump for config integration
"version": "2.0.0",
"port": cfg.Port,
"color_mode": colorMode,
"config": map[string]interface{}{
@ -191,7 +191,6 @@ func main() {
if colorMode {
fmt.Println("Color pass-through enabled")
}
// CHANGED: Log config source information
fmt.Printf("Config loaded from: %s\n", config.GetConfigPath())
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {

View File

@ -67,18 +67,15 @@ func defaults() *Config {
}
// Load reads configuration using lixenwraith/config Builder pattern
// CHANGED: Now uses config.Builder for all source handling
func Load() (*Config, error) {
configPath := GetConfigPath()
// CHANGED: Use Builder pattern with custom environment transform
cfg, err := lconfig.NewBuilder().
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithEnvTransform(customEnvTransform).
WithSources(
// CHANGED: CLI args removed here - handled separately in LoadWithCLI
lconfig.SourceEnv,
lconfig.SourceFile,
lconfig.SourceDefault,
@ -107,7 +104,6 @@ func Load() (*Config, error) {
}
// LoadWithCLI loads configuration and applies CLI arguments
// CHANGED: New function that properly integrates CLI args with config package
func LoadWithCLI(cliArgs []string) (*Config, error) {
configPath := GetConfigPath()
@ -118,7 +114,7 @@ func LoadWithCLI(cliArgs []string) (*Config, error) {
WithDefaults(defaults()).
WithEnvPrefix("LOGWISP_").
WithFile(configPath).
WithArgs(convertedArgs). // CHANGED: Use WithArgs for CLI
WithArgs(convertedArgs).
WithEnvTransform(customEnvTransform).
WithSources(
lconfig.SourceCLI, // CLI highest priority
@ -148,16 +144,14 @@ func LoadWithCLI(cliArgs []string) (*Config, error) {
return finalConfig, finalConfig.validate()
}
// CHANGED: Custom environment transform that handles LOGWISP_ prefix more flexibly
// customEnvTransform handles LOGWISP_ prefix environment variables
func customEnvTransform(path string) string {
// Standard transform
env := strings.ReplaceAll(path, ".", "_")
env = strings.ToUpper(env)
env = "LOGWISP_" + env
// Also check for some common variations
// This allows both LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC
// and LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SECOND
// Handle common variations
switch env {
case "LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SECOND":
if _, exists := os.LookupEnv("LOGWISP_STREAM_RATE_LIMIT_REQUESTS_PER_SEC"); exists {
@ -172,7 +166,7 @@ func customEnvTransform(path string) string {
return env
}
// CHANGED: Convert CLI args to config package format
// convertCLIArgs converts CLI args to config package format
func convertCLIArgs(args []string) []string {
var converted []string
@ -194,7 +188,6 @@ func convertCLIArgs(args []string) []string {
}
// GetConfigPath returns the configuration file path
// CHANGED: Exported and simplified - now just returns the path, no manual env handling
func GetConfigPath() string {
// Check explicit config file paths
if configFile := os.Getenv("LOGWISP_CONFIG_FILE"); configFile != "" {
@ -219,7 +212,7 @@ func GetConfigPath() string {
return "logwisp.toml"
}
// CHANGED: Special handling for comma-separated monitor targets env var
// handleMonitorTargetsEnv handles comma-separated monitor targets env var
func handleMonitorTargetsEnv(cfg *lconfig.Config) error {
if targetsStr := os.Getenv("LOGWISP_MONITOR_TARGETS"); targetsStr != "" {
// Clear any existing targets from file/defaults

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/monitor"
@ -22,13 +23,16 @@ type Streamer struct {
done chan struct{}
colorMode bool
wg sync.WaitGroup
// Metrics
totalDropped atomic.Int64
}
type clientConnection struct {
id string
channel chan monitor.LogEntry
lastActivity time.Time
dropped int64 // Count of dropped messages
dropped atomic.Int64 // Track per-client dropped messages
}
// New creates a new SSE streamer
@ -53,14 +57,10 @@ func NewWithOptions(bufferSize int, colorMode bool) *Streamer {
return s
}
// run manages client connections with timeout cleanup
// run manages client connections - SIMPLIFIED: no forced disconnections
func (s *Streamer) run() {
defer s.wg.Done()
// Add periodic cleanup for stale/slow clients
cleanupTicker := time.NewTicker(30 * time.Second)
defer cleanupTicker.Stop()
for {
select {
case c := <-s.register:
@ -79,47 +79,27 @@ func (s *Streamer) run() {
case entry := <-s.broadcast:
s.mu.RLock()
now := time.Now()
var toRemove []string
for id, client := range s.clients {
for _, client := range s.clients {
select {
case client.channel <- entry:
// Successfully sent
client.lastActivity = now
client.dropped.Store(0) // Reset dropped counter on success
default:
// Track dropped messages and remove slow clients
client.dropped++
// Remove clients that have dropped >100 messages or been inactive >2min
if client.dropped > 100 || now.Sub(client.lastActivity) > 2*time.Minute {
toRemove = append(toRemove, id)
// Buffer full - skip this message for this client
// Don't disconnect, just track dropped messages
dropped := client.dropped.Add(1)
s.totalDropped.Add(1)
// Log significant drop milestones for monitoring
if dropped == 100 || dropped == 1000 || dropped%10000 == 0 {
// Could add logging here if needed
}
}
}
s.mu.RUnlock()
// Remove slow/stale clients outside the read lock
if len(toRemove) > 0 {
s.mu.Lock()
for _, id := range toRemove {
if client, ok := s.clients[id]; ok {
close(client.channel)
delete(s.clients, id)
}
}
s.mu.Unlock()
}
case <-cleanupTicker.C:
// Periodic cleanup of inactive clients
s.mu.Lock()
now := time.Now()
for id, client := range s.clients {
if now.Sub(client.lastActivity) > 5*time.Minute {
close(client.channel)
delete(s.clients, id)
}
}
s.mu.Unlock()
case <-s.done:
s.mu.Lock()
for _, client := range s.clients {
@ -138,18 +118,21 @@ func (s *Streamer) Publish(entry monitor.LogEntry) {
case s.broadcast <- entry:
// Sent to broadcast channel
default:
// Drop entry if broadcast buffer full, log occurrence
// This prevents memory exhaustion under high load
// Broadcast buffer full - drop the message globally
s.totalDropped.Add(1)
}
}
// ServeHTTP implements http.Handler for SSE
// ServeHTTP implements http.Handler for SSE - SIMPLIFIED
func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
// SECURITY: Prevent XSS
w.Header().Set("X-Content-Type-Options", "nosniff")
// Create client
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
@ -159,7 +142,6 @@ func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
id: clientID,
channel: ch,
lastActivity: time.Now(),
dropped: 0,
}
// Register client
@ -169,41 +151,29 @@ func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}()
// Send initial connection event
fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID)
fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\",\"buffer_size\":%d}\n\n",
clientID, s.bufferSize)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Create ticker for heartbeat
// Create ticker for heartbeat - keeps connection alive through proxies
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Add timeout for slow clients
clientTimeout := time.NewTimer(10 * time.Minute)
defer clientTimeout.Stop()
// Stream events
// Stream events until client disconnects
for {
select {
case <-r.Context().Done():
// Client disconnected
return
case entry, ok := <-ch:
if !ok {
// Channel was closed (client removed due to slowness)
fmt.Fprintf(w, "event: disconnected\ndata: {\"reason\":\"slow_client\"}\n\n")
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Channel closed
return
}
// Reset client timeout on successful read
if !clientTimeout.Stop() {
<-clientTimeout.C
}
clientTimeout.Reset(10 * time.Minute)
// Process entry for color if needed
if s.colorMode {
entry = s.processColorEntry(entry)
@ -220,19 +190,11 @@ func (s *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
case <-ticker.C:
// Heartbeat with UTC timestamp
fmt.Fprintf(w, ": heartbeat %s\n\n", time.Now().UTC().Format("2006-01-02T15:04:05.000000Z07:00"))
// Send heartbeat as SSE comment
fmt.Fprintf(w, ": heartbeat %s\n\n", time.Now().UTC().Format(time.RFC3339))
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case <-clientTimeout.C:
// Client timeout - close connection
fmt.Fprintf(w, "event: timeout\ndata: {\"reason\":\"client_timeout\"}\n\n")
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return
}
}
}
@ -246,11 +208,8 @@ func (s *Streamer) Stop() {
close(s.broadcast)
}
// Enhanced color processing with proper ANSI handling
// processColorEntry preserves ANSI codes in JSON
func (s *Streamer) processColorEntry(entry monitor.LogEntry) monitor.LogEntry {
// For color mode, we preserve ANSI codes but ensure they're properly handled
// The JSON marshaling will escape them correctly for transmission
// Client-side handling is required for proper display
return entry
}
@ -263,13 +222,24 @@ func (s *Streamer) Stats() map[string]interface{} {
"active_clients": len(s.clients),
"buffer_size": s.bufferSize,
"color_mode": s.colorMode,
"total_dropped": s.totalDropped.Load(),
}
totalDropped := int64(0)
for _, client := range s.clients {
totalDropped += client.dropped
// Include per-client dropped counts if any are significant
var clientsWithDrops []map[string]interface{}
for id, client := range s.clients {
dropped := client.dropped.Load()
if dropped > 0 {
clientsWithDrops = append(clientsWithDrops, map[string]interface{}{
"id": id,
"dropped": dropped,
})
}
}
if len(clientsWithDrops) > 0 {
stats["clients_with_drops"] = clientsWithDrops
}
stats["total_dropped"] = totalDropped
return stats
}