From be5bb9f2bd984aef9d8fa00a31b4551290316f674229b045c0ba511d2dcf33cb Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Sun, 13 Jul 2025 15:09:40 -0400 Subject: [PATCH] v0.3.4 rate limit and net limit separated, rate limit by size added --- go.mod | 2 +- go.sum | 4 +- src/internal/config/pipeline.go | 18 +++- src/internal/config/ratelimit.go | 6 ++ src/internal/config/server.go | 4 +- .../netlimiter.go => limiter/token_bucket.go} | 41 +++++--- src/internal/netlimit/limiter.go | 25 ++--- src/internal/ratelimit/limiter.go | 97 ++++++++++--------- src/internal/sink/http.go | 3 +- src/internal/sink/tcp.go | 3 +- src/internal/source/file_watcher.go | 3 + src/internal/source/http.go | 6 ++ src/internal/source/source.go | 1 + src/internal/source/stdin.go | 1 + src/internal/source/tcp.go | 6 ++ 15 files changed, 137 insertions(+), 83 deletions(-) rename src/internal/{netlimit/netlimiter.go => limiter/token_bucket.go} (67%) diff --git a/go.mod b/go.mod index edc5ea4..9a589bc 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.5 require ( github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 - github.com/lixenwraith/log v0.0.0-20250710220042-139c522da9aa + github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f github.com/panjf2000/gnet/v2 v2.9.1 github.com/valyala/fasthttp v1.63.0 ) diff --git a/go.sum b/go.sum index 992f13f..49d5fa9 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zt github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 h1:ixTIdJSd945n/IhMRwGwQVmQnQ1nUr5z1wn31jXq9FU= github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497/go.mod h1:y7kgDrWIFROWJJ6ASM/SPTRRAj27FjRGWh2SDLcdQ68= -github.com/lixenwraith/log v0.0.0-20250710220042-139c522da9aa h1:lIo780MTgZXH5jF+Qr24fRDFQTQ3eu3OaaFYLP9ZkR0= -github.com/lixenwraith/log v0.0.0-20250710220042-139c522da9aa/go.mod h1:lEjTIMWGW+XGn20x5Ec0qkNK4LCd7Y/04PII51crYKk= +github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f h1:GBSik9B/Eaqz1ogSbG6C+Ti199k8cd7atnNSkCdkfV4= +github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f/go.mod h1:lEjTIMWGW+XGn20x5Ec0qkNK4LCd7Y/04PII51crYKk= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= diff --git a/src/internal/config/pipeline.go b/src/internal/config/pipeline.go index 4343a11..609e38a 100644 --- a/src/internal/config/pipeline.go +++ b/src/internal/config/pipeline.go @@ -37,10 +37,6 @@ type SourceConfig struct { // Type-specific configuration options Options map[string]any `toml:"options"` - - // Placeholder for future source-side rate limiting - // This will be used for features like aggregation and summarization - NetLimit *NetLimitConfig `toml:"net_limit"` } // SinkConfig represents an output destination @@ -116,6 +112,13 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err } } + // Validate net_limit if present within Options + if rl, ok := cfg.Options["net_limit"].(map[string]any); ok { + if err := validateNetLimitOptions("HTTP source", pipelineName, sourceIndex, rl); err != nil { + return err + } + } + case "tcp": // Validate TCP source options port, ok := toInt(cfg.Options["port"]) @@ -124,6 +127,13 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err pipelineName, sourceIndex) } + // Validate net_limit if present within Options + if rl, ok := cfg.Options["net_limit"].(map[string]any); ok { + if err := validateNetLimitOptions("TCP source", pipelineName, sourceIndex, rl); err != nil { + return err + } + } + default: return fmt.Errorf("pipeline '%s' source[%d]: unknown source type '%s'", pipelineName, sourceIndex, cfg.Type) diff --git a/src/internal/config/ratelimit.go b/src/internal/config/ratelimit.go index 4a0225d..11f78a3 100644 --- a/src/internal/config/ratelimit.go +++ b/src/internal/config/ratelimit.go @@ -24,6 +24,8 @@ type RateLimitConfig struct { Burst float64 `toml:"burst"` // Policy defines the action to take when the limit is exceeded. "pass" or "drop". Policy string `toml:"policy"` + // MaxEntrySizeBytes is the maximum allowed size for a single log entry. 0 = no limit. + MaxEntrySizeBytes int `toml:"max_entry_size_bytes"` } func validateRateLimit(pipelineName string, cfg *RateLimitConfig) error { @@ -39,6 +41,10 @@ func validateRateLimit(pipelineName string, cfg *RateLimitConfig) error { return fmt.Errorf("pipeline '%s': rate limit burst cannot be negative", pipelineName) } + if cfg.MaxEntrySizeBytes < 0 { + return fmt.Errorf("pipeline '%s': max entry size bytes cannot be negative", pipelineName) + } + // Validate policy switch strings.ToLower(cfg.Policy) { case "", "pass", "drop": diff --git a/src/internal/config/server.go b/src/internal/config/server.go index 82e56e2..af5d90c 100644 --- a/src/internal/config/server.go +++ b/src/internal/config/server.go @@ -15,7 +15,7 @@ type TCPConfig struct { NetLimit *NetLimitConfig `toml:"net_limit"` // Heartbeat - Heartbeat HeartbeatConfig `toml:"heartbeat"` + Heartbeat *HeartbeatConfig `toml:"heartbeat"` } type HTTPConfig struct { @@ -34,7 +34,7 @@ type HTTPConfig struct { NetLimit *NetLimitConfig `toml:"net_limit"` // Heartbeat - Heartbeat HeartbeatConfig `toml:"heartbeat"` + Heartbeat *HeartbeatConfig `toml:"heartbeat"` } type HeartbeatConfig struct { diff --git a/src/internal/netlimit/netlimiter.go b/src/internal/limiter/token_bucket.go similarity index 67% rename from src/internal/netlimit/netlimiter.go rename to src/internal/limiter/token_bucket.go index 29d2990..aba571c 100644 --- a/src/internal/netlimit/netlimiter.go +++ b/src/internal/limiter/token_bucket.go @@ -1,16 +1,17 @@ -// FILE: src/internal/netlimit/netlimiter.go -package netlimit +// FILE: src/internal/limiter/token_bucket.go +package limiter import ( "sync" "time" ) -// TokenBucket implements a token bucket net limiter +// TokenBucket implements a token bucket rate limiter +// Safe for concurrent use. type TokenBucket struct { capacity float64 tokens float64 - refillRate float64 + refillRate float64 // tokens per second lastRefill time.Time mu sync.Mutex } @@ -19,7 +20,7 @@ type TokenBucket struct { func NewTokenBucket(capacity float64, refillRate float64) *TokenBucket { return &TokenBucket{ capacity: capacity, - tokens: capacity, + tokens: capacity, // Start full refillRate: refillRate, lastRefill: time.Now(), } @@ -35,7 +36,27 @@ func (tb *TokenBucket) AllowN(n float64) bool { tb.mu.Lock() defer tb.mu.Unlock() - // Refill tokens based on time elapsed + tb.refill() + + if tb.tokens >= n { + tb.tokens -= n + return true + } + return false +} + +// Tokens returns the current number of available tokens +func (tb *TokenBucket) Tokens() float64 { + tb.mu.Lock() + defer tb.mu.Unlock() + + tb.refill() + return tb.tokens +} + +// refill adds tokens based on time elapsed since last refill +// MUST be called with mutex held +func (tb *TokenBucket) refill() { now := time.Now() elapsed := now.Sub(tb.lastRefill).Seconds() @@ -43,7 +64,6 @@ func (tb *TokenBucket) AllowN(n float64) bool { if elapsed < 0 { // Clock went backwards, reset to current time but don't add tokens tb.lastRefill = now - // Don't log here as this is a hot path elapsed = 0 } @@ -52,11 +72,4 @@ func (tb *TokenBucket) AllowN(n float64) bool { tb.tokens = tb.capacity } tb.lastRefill = now - - // Check if we have enough tokens - if tb.tokens >= n { - tb.tokens -= n - return true - } - return false } \ No newline at end of file diff --git a/src/internal/netlimit/limiter.go b/src/internal/netlimit/limiter.go index 7d52a9e..7cf4851 100644 --- a/src/internal/netlimit/limiter.go +++ b/src/internal/netlimit/limiter.go @@ -10,11 +10,12 @@ import ( "time" "logwisp/src/internal/config" + "logwisp/src/internal/limiter" "github.com/lixenwraith/log" ) -// Manages net limiting for a transport +// Limiter manages net limiting for a transport type Limiter struct { config config.NetLimitConfig logger *log.Logger @@ -24,7 +25,7 @@ type Limiter struct { ipMu sync.RWMutex // Global limiter for the transport - globalLimiter *TokenBucket + globalLimiter *limiter.TokenBucket // Connection tracking ipConnections map[string]*atomic.Int32 @@ -46,7 +47,7 @@ type Limiter struct { } type ipLimiter struct { - bucket *TokenBucket + bucket *limiter.TokenBucket lastSeen time.Time connections atomic.Int32 } @@ -76,7 +77,7 @@ func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter { // Create global limiter if not using per-IP limiting if cfg.LimitBy == "global" { - l.globalLimiter = NewTokenBucket( + l.globalLimiter = limiter.NewTokenBucket( float64(cfg.BurstSize), cfg.RequestsPerSecond, ) @@ -334,24 +335,24 @@ func (l *Limiter) checkLimit(ip string) bool { case "ip", "": // Default to per-IP limiting l.ipMu.Lock() - limiter, exists := l.ipLimiters[ip] + lim, exists := l.ipLimiters[ip] if !exists { // Create new limiter for this IP - limiter = &ipLimiter{ - bucket: NewTokenBucket( + lim = &ipLimiter{ + bucket: limiter.NewTokenBucket( float64(l.config.BurstSize), l.config.RequestsPerSecond, ), lastSeen: time.Now(), } - l.ipLimiters[ip] = limiter + l.ipLimiters[ip] = lim l.uniqueIPs.Add(1) l.logger.Debug("msg", "Created new IP limiter", "ip", ip, "total_ips", l.uniqueIPs.Load()) } else { - limiter.lastSeen = time.Now() + lim.lastSeen = time.Now() } l.ipMu.Unlock() @@ -366,7 +367,7 @@ func (l *Limiter) checkLimit(ip string) bool { } } - return limiter.bucket.Allow() + return lim.bucket.Allow() default: // Unknown limit_by value, allow by default @@ -398,8 +399,8 @@ func (l *Limiter) cleanup() { defer l.ipMu.Unlock() cleaned := 0 - for ip, limiter := range l.ipLimiters { - if now.Sub(limiter.lastSeen) > staleTimeout { + for ip, lim := range l.ipLimiters { + if now.Sub(lim.lastSeen) > staleTimeout { delete(l.ipLimiters, ip) cleaned++ } diff --git a/src/internal/ratelimit/limiter.go b/src/internal/ratelimit/limiter.go index 8971acb..ac62bec 100644 --- a/src/internal/ratelimit/limiter.go +++ b/src/internal/ratelimit/limiter.go @@ -3,27 +3,25 @@ package ratelimit import ( "strings" - "sync" "sync/atomic" - "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/limiter" + "logwisp/src/internal/source" "github.com/lixenwraith/log" - "logwisp/src/internal/config" - "logwisp/src/internal/source" ) // Limiter enforces rate limits on log entries flowing through a pipeline. type Limiter struct { - mu sync.Mutex - rate float64 - burst float64 - tokens float64 - lastToken time.Time - policy config.RateLimitPolicy - logger *log.Logger + bucket *limiter.TokenBucket + policy config.RateLimitPolicy + logger *log.Logger // Statistics - droppedCount atomic.Uint64 + maxEntrySizeBytes int + droppedBySizeCount atomic.Uint64 + droppedCount atomic.Uint64 } // New creates a new rate limiter. If cfg.Rate is 0, it returns nil. @@ -46,12 +44,14 @@ func New(cfg config.RateLimitConfig, logger *log.Logger) (*Limiter, error) { } l := &Limiter{ - rate: cfg.Rate, - burst: burst, - tokens: burst, - lastToken: time.Now(), - policy: policy, - logger: logger, + bucket: limiter.NewTokenBucket(burst, cfg.Rate), + policy: policy, + logger: logger, + maxEntrySizeBytes: cfg.MaxEntrySizeBytes, + } + + if cfg.Rate > 0 { + l.bucket = limiter.NewTokenBucket(burst, cfg.Rate) } return l, nil @@ -60,46 +60,51 @@ func New(cfg config.RateLimitConfig, logger *log.Logger) (*Limiter, error) { // Allow checks if a log entry is allowed to pass based on the rate limit. // It returns true if the entry should pass, false if it should be dropped. func (l *Limiter) Allow(entry source.LogEntry) bool { - if l.policy == config.PolicyPass { + if l == nil || l.policy == config.PolicyPass { return true } - l.mu.Lock() - defer l.mu.Unlock() - - now := time.Now() - elapsed := now.Sub(l.lastToken).Seconds() - - if elapsed < 0 { - // Clock went backwards, don't add tokens - l.lastToken = now - elapsed = 0 + // Check size limit first + if l.maxEntrySizeBytes > 0 && entry.RawSize > l.maxEntrySizeBytes { + l.droppedBySizeCount.Add(1) + return false } - l.tokens += elapsed * l.rate - if l.tokens > l.burst { - l.tokens = l.burst - } - l.lastToken = now - - if l.tokens >= 1 { - l.tokens-- - return true + // Check rate limit if configured + if l.bucket != nil { + if l.bucket.Allow() { + return true + } + // Not enough tokens, drop the entry + l.droppedCount.Add(1) + return false } - // Not enough tokens, drop the entry - l.droppedCount.Add(1) - return false + // No rate limit configured, size check passed + return true } // GetStats returns the statistics for the limiter. func (l *Limiter) GetStats() map[string]any { - return map[string]any{ - "dropped_total": l.droppedCount.Load(), - "policy": policyString(l.policy), - "rate": l.rate, - "burst": l.burst, + if l == nil { + return map[string]any{ + "enabled": false, + } } + + stats := map[string]any{ + "enabled": true, + "dropped_total": l.droppedCount.Load(), + "dropped_by_size_total": l.droppedBySizeCount.Load(), + "policy": policyString(l.policy), + "max_entry_size_bytes": l.maxEntrySizeBytes, + } + + if l.bucket != nil { + stats["tokens"] = l.bucket.Tokens() + } + + return stats } // policyString returns the string representation of the policy. diff --git a/src/internal/sink/http.go b/src/internal/sink/http.go index 0695ef7..62a03d3 100644 --- a/src/internal/sink/http.go +++ b/src/internal/sink/http.go @@ -54,7 +54,7 @@ type HTTPConfig struct { BufferSize int StreamPath string StatusPath string - Heartbeat config.HeartbeatConfig + Heartbeat *config.HeartbeatConfig SSL *config.SSLConfig NetLimit *config.NetLimitConfig } @@ -84,6 +84,7 @@ func NewHTTPSink(options map[string]any, logger *log.Logger) (*HTTPSink, error) // Extract heartbeat config if hb, ok := options["heartbeat"].(map[string]any); ok { + cfg.Heartbeat = &config.HeartbeatConfig{} cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool) if interval, ok := toInt(hb["interval_seconds"]); ok { cfg.Heartbeat.IntervalSeconds = interval diff --git a/src/internal/sink/tcp.go b/src/internal/sink/tcp.go index 50fa0e2..4816430 100644 --- a/src/internal/sink/tcp.go +++ b/src/internal/sink/tcp.go @@ -41,7 +41,7 @@ type TCPSink struct { type TCPConfig struct { Port int BufferSize int - Heartbeat config.HeartbeatConfig + Heartbeat *config.HeartbeatConfig SSL *config.SSLConfig NetLimit *config.NetLimitConfig } @@ -63,6 +63,7 @@ func NewTCPSink(options map[string]any, logger *log.Logger) (*TCPSink, error) { // Extract heartbeat config if hb, ok := options["heartbeat"].(map[string]any); ok { + cfg.Heartbeat = &config.HeartbeatConfig{} cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool) if interval, ok := toInt(hb["interval_seconds"]); ok { cfg.Heartbeat.IntervalSeconds = interval diff --git a/src/internal/source/file_watcher.go b/src/internal/source/file_watcher.go index b2c4da9..8412edc 100644 --- a/src/internal/source/file_watcher.go +++ b/src/internal/source/file_watcher.go @@ -258,7 +258,10 @@ func (w *fileWatcher) checkFile() error { continue } + rawSize := len(line) entry := w.parseLine(line) + entry.RawSize = rawSize + w.callback(entry) w.entriesRead.Add(1) w.lastReadTime.Store(time.Now()) diff --git a/src/internal/source/http.go b/src/internal/source/http.go index 854619f..ed29b9d 100644 --- a/src/internal/source/http.go +++ b/src/internal/source/http.go @@ -271,6 +271,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { if single.Source == "" { single.Source = "http" } + single.RawSize = len(body) entries = append(entries, single) return entries, nil } @@ -278,6 +279,8 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { // Try to parse as JSON array var array []LogEntry if err := json.Unmarshal(body, &array); err == nil { + // TODO: Placeholder; For array, divide total size by entry count as approximation + approxSizePerEntry := len(body) / len(array) for i, entry := range array { if entry.Message == "" { return nil, fmt.Errorf("entry %d missing required field: message", i) @@ -288,6 +291,8 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { if entry.Source == "" { array[i].Source = "http" } + // TODO: Placeholder + array[i].RawSize = approxSizePerEntry } return array, nil } @@ -313,6 +318,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { if entry.Source == "" { entry.Source = "http" } + entry.RawSize = len(line) entries = append(entries, entry) } diff --git a/src/internal/source/source.go b/src/internal/source/source.go index 657e3ff..70725e0 100644 --- a/src/internal/source/source.go +++ b/src/internal/source/source.go @@ -13,6 +13,7 @@ type LogEntry struct { Level string `json:"level,omitempty"` Message string `json:"message"` Fields json.RawMessage `json:"fields,omitempty"` + RawSize int `json:"-"` } // Source represents an input data stream diff --git a/src/internal/source/stdin.go b/src/internal/source/stdin.go index 8ae9ef0..16e54e6 100644 --- a/src/internal/source/stdin.go +++ b/src/internal/source/stdin.go @@ -82,6 +82,7 @@ func (s *StdinSource) readLoop() { Source: "stdin", Message: line, Level: extractLogLevel(line), + RawSize: len(line), } s.publish(entry) diff --git a/src/internal/source/tcp.go b/src/internal/source/tcp.go index ed72857..5175126 100644 --- a/src/internal/source/tcp.go +++ b/src/internal/source/tcp.go @@ -341,6 +341,9 @@ func (s *tcpSourceServer) OnTraffic(c gnet.Conn) gnet.Action { continue } + // Capture raw line size before parsing + rawSize := len(line) + // Parse JSON log entry var entry LogEntry if err := json.Unmarshal(line, &entry); err != nil { @@ -364,6 +367,9 @@ func (s *tcpSourceServer) OnTraffic(c gnet.Conn) gnet.Action { entry.Source = "tcp" } + // Set raw size + entry.RawSize = rawSize + // Publish the entry s.source.publish(entry) }