From fce6ee5c65e5ba42a612df615c123a078b02b5f1f3fa1f6c401688e14cf9b6a9 Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Mon, 1 Sep 2025 16:16:52 -0400 Subject: [PATCH] v0.3.10 config auto-save added, dependency update, limiter packages refactored --- go.mod | 9 ++-- go.sum | 20 ++++---- src/cmd/logwisp/main.go | 49 +++++++++++++++++++ src/cmd/logwisp/reload.go | 19 +++++-- src/internal/config/config.go | 1 + src/internal/config/loader.go | 1 + src/internal/config/saver.go | 34 +++++++++++++ src/internal/core/types.go | 17 +++++++ src/internal/filter/chain.go | 4 +- src/internal/filter/filter.go | 4 +- src/internal/format/format.go | 4 +- src/internal/format/json.go | 6 +-- src/internal/format/raw.go | 4 +- src/internal/format/text.go | 4 +- .../{netlimit/limiter.go => limit/net.go} | 41 ++++++++-------- .../{ratelimit/limiter.go => limit/rate.go} | 27 +++++----- .../{limiter => limit}/token_bucket.go | 4 +- src/internal/service/pipeline.go | 4 +- src/internal/service/service.go | 7 +-- src/internal/sink/console.go | 14 +++--- src/internal/sink/file.go | 8 +-- src/internal/sink/http.go | 33 ++++++++----- src/internal/sink/http_client.go | 20 ++++---- src/internal/sink/sink.go | 4 +- src/internal/sink/tcp.go | 49 ++++++++++--------- src/internal/sink/tcp_client.go | 10 ++-- src/internal/source/directory.go | 10 ++-- src/internal/source/file_watcher.go | 14 +++--- src/internal/source/http.go | 25 +++++----- src/internal/source/source.go | 15 ++---- src/internal/source/stdin.go | 12 +++-- src/internal/source/tcp.go | 17 ++++--- 32 files changed, 309 insertions(+), 181 deletions(-) create mode 100644 src/internal/config/saver.go create mode 100644 src/internal/core/types.go rename src/internal/{netlimit/limiter.go => limit/net.go} (90%) rename src/internal/{ratelimit/limiter.go => limit/rate.go} (76%) rename src/internal/{limiter => limit}/token_bucket.go (95%) diff --git a/go.mod b/go.mod index 536f2a1..341a6cb 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module logwisp go 1.24.5 require ( - github.com/lixenwraith/config v0.0.0-20250721005322-3b1023974d3d + github.com/lixenwraith/config v0.0.0-20250901201021-59a461e31cd4 github.com/lixenwraith/log v0.0.0-20250722012845-16a3079e46e2 - github.com/panjf2000/gnet/v2 v2.9.1 - github.com/valyala/fasthttp v1.64.0 + github.com/panjf2000/gnet/v2 v2.9.3 + github.com/valyala/fasthttp v1.65.0 ) require ( @@ -20,8 +20,9 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.34.0 // indirect + golang.org/x/sys v0.35.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/mitchellh/mapstructure => github.com/go-viper/mapstructure v1.6.0 diff --git a/go.sum b/go.sum index b948ae6..c1e891b 100644 --- a/go.sum +++ b/go.sum @@ -8,24 +8,22 @@ github.com/go-viper/mapstructure v1.6.0 h1:0WdPOF2rmmQDN1xo8qIgxyugvLp71HrZSWyGL github.com/go-viper/mapstructure v1.6.0/go.mod h1:FcbLReH7/cjaC0RVQR+LHFIrBhHF3s1e/ud1KMDoBVw= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/lixenwraith/config v0.0.0-20250721005322-3b1023974d3d h1:h3IWdUA6Fyl5/lvNmPdtKtLFVnZos71aV3RHILYKY/M= -github.com/lixenwraith/config v0.0.0-20250721005322-3b1023974d3d/go.mod h1:F8ieHeZgOCPsoym5eynx4kjupfLXBpvJfnX1GzX++EA= -github.com/lixenwraith/log v0.0.0-20250720221103-db34b7e4a2aa h1:7x25rdA8azXtY46/MgDQIKTLpZv6TXtqMBCfzL5wSJ4= -github.com/lixenwraith/log v0.0.0-20250720221103-db34b7e4a2aa/go.mod h1:asd0/TQplmacopOKWcqW0jysau/lWohR2Fe29KBSp2w= +github.com/lixenwraith/config v0.0.0-20250901201021-59a461e31cd4 h1:SxqXt6J7ZLA39SP4zvJU0Jv3GbXLzM5iB7cgk5d7Pe4= +github.com/lixenwraith/config v0.0.0-20250901201021-59a461e31cd4/go.mod h1:l+1PZ8JsohLAXOJKu5loFa+zCdOSb/lXf3JUwa5ST/4= github.com/lixenwraith/log v0.0.0-20250722012845-16a3079e46e2 h1:nP/12l+gKkZnZRoM3Vy4iT2anBQm1jCtrppyZq9pcq4= github.com/lixenwraith/log v0.0.0-20250722012845-16a3079e46e2/go.mod h1:sLCRfKeLInCj2LcMnAo2knULwfszU8QPuIFOQ8crcFo= github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= -github.com/panjf2000/gnet/v2 v2.9.1 h1:bKewICy/0xnQ9PMzNaswpe/Ah14w1TrRk91LHTcbIlA= -github.com/panjf2000/gnet/v2 v2.9.1/go.mod h1:WQTxDWYuQ/hz3eccH0FN32IVuvZ19HewEWx0l62fx7E= +github.com/panjf2000/gnet/v2 v2.9.3 h1:auV3/A9Na3jiBDmYAAU00rPhFKnsAI+TnI1F7YUJMHQ= +github.com/panjf2000/gnet/v2 v2.9.3/go.mod h1:WQTxDWYuQ/hz3eccH0FN32IVuvZ19HewEWx0l62fx7E= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.64.0 h1:QBygLLQmiAyiXuRhthf0tuRkqAFcrC42dckN2S+N3og= -github.com/valyala/fasthttp v1.64.0/go.mod h1:dGmFxwkWXSK0NbOSJuF7AMVzU+lkHz0wQVvVITv2UQA= +github.com/valyala/fasthttp v1.65.0 h1:j/u3uzFEGFfRxw79iYzJN+TteTJwbYkru9uDp3d0Yf8= +github.com/valyala/fasthttp v1.65.0/go.mod h1:P/93/YkKPMsKSnATEeELUCkG8a7Y+k99uxNHVbKINr4= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -36,8 +34,10 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/src/cmd/logwisp/main.go b/src/cmd/logwisp/main.go index 857d83d..75ebb0b 100644 --- a/src/cmd/logwisp/main.go +++ b/src/cmd/logwisp/main.go @@ -160,6 +160,8 @@ func main() { select { case <-done: + // Save configuration after graceful shutdown (no reload manager in static mode) + saveConfigurationOnExit(cfg, nil, logger) logger.Info("msg", "Shutdown complete") case <-shutdownCtx.Done(): logger.Error("msg", "Shutdown timeout exceeded - forcing exit") @@ -172,6 +174,9 @@ func main() { // Wait for context cancellation <-ctx.Done() + // Save configuration before final shutdown, handled by reloadManager + saveConfigurationOnExit(cfg, reloadManager, logger) + // Shutdown is handled by ReloadManager.Shutdown() in defer logger.Info("msg", "Shutdown complete") } @@ -183,4 +188,48 @@ func shutdownLogger() { Error("Logger shutdown error: %v\n", err) } } +} + +// saveConfigurationOnExit saves the configuration to file on exist +func saveConfigurationOnExit(cfg *config.Config, reloadManager *ReloadManager, logger *log.Logger) { + // Only save if explicitly enabled and we have a valid path + if !cfg.ConfigSaveOnExit || cfg.ConfigFile == "" { + return + } + + // Create a context with timeout for save operation + saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Perform save in goroutine to respect timeout + done := make(chan error, 1) + go func() { + var err error + if reloadManager != nil && reloadManager.lcfg != nil { + // Use existing lconfig instance from reload manager + // This ensures we save through the same configuration system + err = reloadManager.lcfg.Save(cfg.ConfigFile) + } else { + // Static mode: create temporary lconfig for saving + err = cfg.SaveToFile(cfg.ConfigFile) + } + done <- err + }() + + select { + case err := <-done: + if err != nil { + logger.Error("msg", "Failed to save configuration on exit", + "path", cfg.ConfigFile, + "error", err) + // Don't fail the exit on save error + } else { + logger.Info("msg", "Configuration saved successfully", + "path", cfg.ConfigFile) + } + case <-saveCtx.Done(): + logger.Error("msg", "Configuration save timeout exceeded", + "path", cfg.ConfigFile, + "timeout", "5s") + } } \ No newline at end of file diff --git a/src/cmd/logwisp/reload.go b/src/cmd/logwisp/reload.go index c1999a9..b8f64e6 100644 --- a/src/cmd/logwisp/reload.go +++ b/src/cmd/logwisp/reload.go @@ -21,7 +21,7 @@ type ReloadManager struct { service *service.Service router *service.HTTPRouter cfg *config.Config - lcfg *lconfig.Config // TODO: use the same cfg struct + lcfg *lconfig.Config logger *log.Logger mu sync.RWMutex reloadingMu sync.Mutex @@ -62,10 +62,15 @@ func (rm *ReloadManager) Start(ctx context.Context) error { rm.startStatusReporter(ctx, svc) } - // Create lconfig instance for file watching + // Create lconfig instance for file watching, logwisp config is always TOML lcfg, err := lconfig.NewBuilder(). WithFile(rm.configPath). WithTarget(rm.cfg). + WithFileFormat("toml"). + WithSecurityOptions(lconfig.SecurityOptions{ + PreventPathTraversal: true, + MaxFileSize: 10 * 1024 * 1024, + }). Build() if err != nil { return fmt.Errorf("failed to create config watcher: %w", err) @@ -78,7 +83,7 @@ func (rm *ReloadManager) Start(ctx context.Context) error { PollInterval: time.Second, Debounce: 500 * time.Millisecond, ReloadTimeout: 30 * time.Second, - VerifyPermissions: true, // SECURITY: Prevent malicious config replacement + VerifyPermissions: true, // TODO: Prevent malicious config replacement, to be implemented } lcfg.AutoUpdateWithOptions(watchOpts) @@ -306,6 +311,14 @@ func (rm *ReloadManager) stopStatusReporter() { } } +// SaveConfig is a wrapper to save the config +func (rm *ReloadManager) SaveConfig(path string) error { + if rm.lcfg == nil { + return fmt.Errorf("no lconfig instance available") + } + return rm.lcfg.Save(path) +} + // Shutdown stops the reload manager func (rm *ReloadManager) Shutdown() { rm.logger.Info("msg", "Shutting down reload manager") diff --git a/src/internal/config/config.go b/src/internal/config/config.go index ca91567..77a446a 100644 --- a/src/internal/config/config.go +++ b/src/internal/config/config.go @@ -11,6 +11,7 @@ type Config struct { // Runtime behavior flags DisableStatusReporter bool `toml:"disable_status_reporter"` ConfigAutoReload bool `toml:"config_auto_reload"` + ConfigSaveOnExit bool `toml:"config_save_on_exit"` // Internal flag indicating demonized child process BackgroundDaemon bool `toml:"background-daemon"` diff --git a/src/internal/config/loader.go b/src/internal/config/loader.go index b0e9b34..6afc749 100644 --- a/src/internal/config/loader.go +++ b/src/internal/config/loader.go @@ -27,6 +27,7 @@ func defaults() *Config { // Runtime behavior defaults DisableStatusReporter: false, ConfigAutoReload: false, + ConfigSaveOnExit: false, // Child process indicator BackgroundDaemon: false, diff --git a/src/internal/config/saver.go b/src/internal/config/saver.go new file mode 100644 index 0000000..b9ae4f1 --- /dev/null +++ b/src/internal/config/saver.go @@ -0,0 +1,34 @@ +// FILE: logwisp/src/internal/config/saver.go +package config + +import ( + "fmt" + + lconfig "github.com/lixenwraith/config" +) + +// SaveToFile saves the configuration to the specified file path. +// It uses the lconfig library's atomic file saving capabilities. +func (c *Config) SaveToFile(path string) error { + if path == "" { + return fmt.Errorf("cannot save config: path is empty") + } + + // Create a temporary lconfig instance just for saving + // This avoids the need to track lconfig throughout the application + lcfg, err := lconfig.NewBuilder(). + WithFile(path). + WithTarget(c). + WithFileFormat("toml"). + Build() + if err != nil { + return fmt.Errorf("failed to create config builder: %w", err) + } + + // Use lconfig's Save method which handles atomic writes + if err := lcfg.Save(path); err != nil { + return fmt.Errorf("failed to save config: %w", err) + } + + return nil +} \ No newline at end of file diff --git a/src/internal/core/types.go b/src/internal/core/types.go new file mode 100644 index 0000000..2ac7630 --- /dev/null +++ b/src/internal/core/types.go @@ -0,0 +1,17 @@ +// FILE: logwisp/src/internal/core/types.go +package core + +import ( + "encoding/json" + "time" +) + +// LogEntry represents a single log record flowing through the pipeline +type LogEntry struct { + Time time.Time `json:"time"` + Source string `json:"source"` + Level string `json:"level,omitempty"` + Message string `json:"message"` + Fields json.RawMessage `json:"fields,omitempty"` + RawSize int64 `json:"-"` +} \ No newline at end of file diff --git a/src/internal/filter/chain.go b/src/internal/filter/chain.go index 5ed1270..1f49b9a 100644 --- a/src/internal/filter/chain.go +++ b/src/internal/filter/chain.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "logwisp/src/internal/config" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -44,7 +44,7 @@ func NewChain(configs []config.FilterConfig, logger *log.Logger) (*Chain, error) // Apply runs all filters in sequence // Returns true if the entry passes all filters -func (c *Chain) Apply(entry source.LogEntry) bool { +func (c *Chain) Apply(entry core.LogEntry) bool { c.totalProcessed.Add(1) // No filters means pass everything diff --git a/src/internal/filter/filter.go b/src/internal/filter/filter.go index 4949f58..fd8b340 100644 --- a/src/internal/filter/filter.go +++ b/src/internal/filter/filter.go @@ -8,7 +8,7 @@ import ( "sync/atomic" "logwisp/src/internal/config" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -61,7 +61,7 @@ func New(cfg config.FilterConfig, logger *log.Logger) (*Filter, error) { } // Apply checks if a log entry should be passed through -func (f *Filter) Apply(entry source.LogEntry) bool { +func (f *Filter) Apply(entry core.LogEntry) bool { f.totalProcessed.Add(1) // No patterns means pass everything diff --git a/src/internal/format/format.go b/src/internal/format/format.go index c539238..9464b0b 100644 --- a/src/internal/format/format.go +++ b/src/internal/format/format.go @@ -4,7 +4,7 @@ package format import ( "fmt" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -12,7 +12,7 @@ import ( // Formatter defines the interface for transforming a LogEntry into a byte slice. type Formatter interface { // Format takes a LogEntry and returns the formatted log as a byte slice. - Format(entry source.LogEntry) ([]byte, error) + Format(entry core.LogEntry) ([]byte, error) // Name returns the formatter type name Name() string diff --git a/src/internal/format/json.go b/src/internal/format/json.go index 4ec3de3..09987c7 100644 --- a/src/internal/format/json.go +++ b/src/internal/format/json.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -52,7 +52,7 @@ func NewJSONFormatter(options map[string]any, logger *log.Logger) (*JSONFormatte } // Format formats the log entry as JSON -func (f *JSONFormatter) Format(entry source.LogEntry) ([]byte, error) { +func (f *JSONFormatter) Format(entry core.LogEntry) ([]byte, error) { // Start with a clean map output := make(map[string]any) @@ -122,7 +122,7 @@ func (f *JSONFormatter) Name() string { // FormatBatch formats multiple entries as a JSON array // This is a special method for sinks that need to batch entries -func (f *JSONFormatter) FormatBatch(entries []source.LogEntry) ([]byte, error) { +func (f *JSONFormatter) FormatBatch(entries []core.LogEntry) ([]byte, error) { // For batching, we need to create an array of formatted objects batch := make([]json.RawMessage, 0, len(entries)) diff --git a/src/internal/format/raw.go b/src/internal/format/raw.go index 5be30b0..3ab09c7 100644 --- a/src/internal/format/raw.go +++ b/src/internal/format/raw.go @@ -2,7 +2,7 @@ package format import ( - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -20,7 +20,7 @@ func NewRawFormatter(options map[string]any, logger *log.Logger) (*RawFormatter, } // Format returns the message with a newline appended -func (f *RawFormatter) Format(entry source.LogEntry) ([]byte, error) { +func (f *RawFormatter) Format(entry core.LogEntry) ([]byte, error) { // Simply return the message with newline return append([]byte(entry.Message), '\n'), nil } diff --git a/src/internal/format/text.go b/src/internal/format/text.go index cc8b8e0..bbafb25 100644 --- a/src/internal/format/text.go +++ b/src/internal/format/text.go @@ -8,7 +8,7 @@ import ( "text/template" "time" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) @@ -59,7 +59,7 @@ func NewTextFormatter(options map[string]any, logger *log.Logger) (*TextFormatte } // Format formats the log entry using the template -func (f *TextFormatter) Format(entry source.LogEntry) ([]byte, error) { +func (f *TextFormatter) Format(entry core.LogEntry) ([]byte, error) { // Prepare data for template data := map[string]any{ "Timestamp": entry.Time, diff --git a/src/internal/netlimit/limiter.go b/src/internal/limit/net.go similarity index 90% rename from src/internal/netlimit/limiter.go rename to src/internal/limit/net.go index c8386d7..067d3d0 100644 --- a/src/internal/netlimit/limiter.go +++ b/src/internal/limit/net.go @@ -1,5 +1,5 @@ -// FILE: logwisp/src/internal/netlimit/limiter.go -package netlimit +// FILE: logwisp/src/internal/limit/net.go +package limit import ( "context" @@ -10,13 +10,12 @@ import ( "time" "logwisp/src/internal/config" - "logwisp/src/internal/limiter" "github.com/lixenwraith/log" ) -// Limiter manages net limiting for a transport -type Limiter struct { +// NetLimiter manages net limiting for a transport +type NetLimiter struct { config config.NetLimitConfig logger *log.Logger @@ -25,7 +24,7 @@ type Limiter struct { ipMu sync.RWMutex // Global limiter for the transport - globalLimiter *limiter.TokenBucket + globalLimiter *TokenBucket // Connection tracking ipConnections map[string]*atomic.Int64 @@ -47,13 +46,13 @@ type Limiter struct { } type ipLimiter struct { - bucket *limiter.TokenBucket + bucket *TokenBucket lastSeen time.Time connections atomic.Int64 } // Creates a new net limiter -func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter { +func NewNetLimiter(cfg config.NetLimitConfig, logger *log.Logger) *NetLimiter { if !cfg.Enabled { return nil } @@ -64,7 +63,7 @@ func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter { ctx, cancel := context.WithCancel(context.Background()) - l := &Limiter{ + l := &NetLimiter{ config: cfg, ipLimiters: make(map[string]*ipLimiter), ipConnections: make(map[string]*atomic.Int64), @@ -77,7 +76,7 @@ func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter { // Create global limiter if not using per-IP limiting if cfg.LimitBy == "global" { - l.globalLimiter = limiter.NewTokenBucket( + l.globalLimiter = NewTokenBucket( float64(cfg.BurstSize), cfg.RequestsPerSecond, ) @@ -95,7 +94,7 @@ func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter { return l } -func (l *Limiter) Shutdown() { +func (l *NetLimiter) Shutdown() { if l == nil { return } @@ -115,7 +114,7 @@ func (l *Limiter) Shutdown() { } // Checks if an HTTP request should be allowed -func (l *Limiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int64, message string) { +func (l *NetLimiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int64, message string) { if l == nil { return true, 0, "" } @@ -185,7 +184,7 @@ func (l *Limiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int64, } // Checks if a TCP connection should be allowed -func (l *Limiter) CheckTCP(remoteAddr net.Addr) bool { +func (l *NetLimiter) CheckTCP(remoteAddr net.Addr) bool { if l == nil { return true } @@ -224,7 +223,7 @@ func isIPv4(ip string) bool { } // Tracks a new connection for an IP -func (l *Limiter) AddConnection(remoteAddr string) { +func (l *NetLimiter) AddConnection(remoteAddr string) { if l == nil { return } @@ -254,7 +253,7 @@ func (l *Limiter) AddConnection(remoteAddr string) { } // Removes a connection for an IP -func (l *Limiter) RemoveConnection(remoteAddr string) { +func (l *NetLimiter) RemoveConnection(remoteAddr string) { if l == nil { return } @@ -291,7 +290,7 @@ func (l *Limiter) RemoveConnection(remoteAddr string) { } // Returns net limiter statistics -func (l *Limiter) GetStats() map[string]any { +func (l *NetLimiter) GetStats() map[string]any { if l == nil { return map[string]any{ "enabled": false, @@ -324,7 +323,7 @@ func (l *Limiter) GetStats() map[string]any { } // Performs the actual net limit check -func (l *Limiter) checkLimit(ip string) bool { +func (l *NetLimiter) checkLimit(ip string) bool { // Maybe run cleanup l.maybeCleanup() @@ -339,7 +338,7 @@ func (l *Limiter) checkLimit(ip string) bool { if !exists { // Create new limiter for this IP lim = &ipLimiter{ - bucket: limiter.NewTokenBucket( + bucket: NewTokenBucket( float64(l.config.BurstSize), l.config.RequestsPerSecond, ), @@ -378,7 +377,7 @@ func (l *Limiter) checkLimit(ip string) bool { } // Runs cleanup if enough time has passed -func (l *Limiter) maybeCleanup() { +func (l *NetLimiter) maybeCleanup() { l.cleanupMu.Lock() defer l.cleanupMu.Unlock() @@ -391,7 +390,7 @@ func (l *Limiter) maybeCleanup() { } // Removes stale IP limiters -func (l *Limiter) cleanup() { +func (l *NetLimiter) cleanup() { staleTimeout := 5 * time.Minute now := time.Now() @@ -414,7 +413,7 @@ func (l *Limiter) cleanup() { } // Runs periodic cleanup -func (l *Limiter) cleanupLoop() { +func (l *NetLimiter) cleanupLoop() { defer close(l.cleanupDone) ticker := time.NewTicker(1 * time.Minute) diff --git a/src/internal/ratelimit/limiter.go b/src/internal/limit/rate.go similarity index 76% rename from src/internal/ratelimit/limiter.go rename to src/internal/limit/rate.go index a1deb46..aaf5f72 100644 --- a/src/internal/ratelimit/limiter.go +++ b/src/internal/limit/rate.go @@ -1,20 +1,19 @@ -// FILE: logwisp/src/internal/ratelimit/limiter.go -package ratelimit +// FILE: logwisp/src/internal/limit/rate.go +package limit import ( "strings" "sync/atomic" "logwisp/src/internal/config" - "logwisp/src/internal/limiter" - "logwisp/src/internal/source" + "logwisp/src/internal/core" "github.com/lixenwraith/log" ) -// Limiter enforces rate limits on log entries flowing through a pipeline. -type Limiter struct { - bucket *limiter.TokenBucket +// RateLimiter enforces rate limits on log entries flowing through a pipeline. +type RateLimiter struct { + bucket *TokenBucket policy config.RateLimitPolicy logger *log.Logger @@ -24,8 +23,8 @@ type Limiter struct { droppedCount atomic.Uint64 } -// New creates a new rate limiter. If cfg.Rate is 0, it returns nil. -func New(cfg config.RateLimitConfig, logger *log.Logger) (*Limiter, error) { +// NewRateLimiter creates a new rate limiter. If cfg.Rate is 0, it returns nil. +func NewRateLimiter(cfg config.RateLimitConfig, logger *log.Logger) (*RateLimiter, error) { if cfg.Rate <= 0 { return nil, nil // No rate limit } @@ -43,15 +42,15 @@ func New(cfg config.RateLimitConfig, logger *log.Logger) (*Limiter, error) { policy = config.PolicyPass } - l := &Limiter{ - bucket: limiter.NewTokenBucket(burst, cfg.Rate), + l := &RateLimiter{ + bucket: NewTokenBucket(burst, cfg.Rate), policy: policy, logger: logger, maxEntrySizeBytes: cfg.MaxEntrySizeBytes, } if cfg.Rate > 0 { - l.bucket = limiter.NewTokenBucket(burst, cfg.Rate) + l.bucket = NewTokenBucket(burst, cfg.Rate) } return l, nil @@ -59,7 +58,7 @@ func New(cfg config.RateLimitConfig, logger *log.Logger) (*Limiter, error) { // Allow checks if a log entry is allowed to pass based on the rate limit. // It returns true if the entry should pass, false if it should be dropped. -func (l *Limiter) Allow(entry source.LogEntry) bool { +func (l *RateLimiter) Allow(entry core.LogEntry) bool { if l == nil || l.policy == config.PolicyPass { return true } @@ -85,7 +84,7 @@ func (l *Limiter) Allow(entry source.LogEntry) bool { } // GetStats returns the statistics for the limiter. -func (l *Limiter) GetStats() map[string]any { +func (l *RateLimiter) GetStats() map[string]any { if l == nil { return map[string]any{ "enabled": false, diff --git a/src/internal/limiter/token_bucket.go b/src/internal/limit/token_bucket.go similarity index 95% rename from src/internal/limiter/token_bucket.go rename to src/internal/limit/token_bucket.go index fdf0dca..ad0ecf8 100644 --- a/src/internal/limiter/token_bucket.go +++ b/src/internal/limit/token_bucket.go @@ -1,5 +1,5 @@ -// FILE: logwisp/src/internal/limiter/token_bucket.go -package limiter +// FILE: logwisp/src/internal/limit/token_bucket.go +package limit import ( "sync" diff --git a/src/internal/service/pipeline.go b/src/internal/service/pipeline.go index 58607dc..765b576 100644 --- a/src/internal/service/pipeline.go +++ b/src/internal/service/pipeline.go @@ -3,13 +3,13 @@ package service import ( "context" - "logwisp/src/internal/ratelimit" "sync" "sync/atomic" "time" "logwisp/src/internal/config" "logwisp/src/internal/filter" + "logwisp/src/internal/limit" "logwisp/src/internal/sink" "logwisp/src/internal/source" @@ -21,7 +21,7 @@ type Pipeline struct { Name string Config config.PipelineConfig Sources []source.Source - RateLimiter *ratelimit.Limiter + RateLimiter *limit.RateLimiter FilterChain *filter.Chain Sinks []sink.Sink Stats *PipelineStats diff --git a/src/internal/service/service.go b/src/internal/service/service.go index 9811066..d54eeff 100644 --- a/src/internal/service/service.go +++ b/src/internal/service/service.go @@ -8,9 +8,10 @@ import ( "time" "logwisp/src/internal/config" + "logwisp/src/internal/core" "logwisp/src/internal/filter" "logwisp/src/internal/format" - "logwisp/src/internal/ratelimit" + "logwisp/src/internal/limit" "logwisp/src/internal/sink" "logwisp/src/internal/source" @@ -81,7 +82,7 @@ func (s *Service) NewPipeline(cfg config.PipelineConfig) error { // Create pipeline rate limiter if cfg.RateLimit != nil { - limiter, err := ratelimit.New(*cfg.RateLimit, s.logger) + limiter, err := limit.NewRateLimiter(*cfg.RateLimit, s.logger) if err != nil { pipelineCancel() return fmt.Errorf("failed to create pipeline rate limiter: %w", err) @@ -163,7 +164,7 @@ func (s *Service) wirePipeline(p *Pipeline) { // Create a processing goroutine for this source p.wg.Add(1) - go func(source source.Source, entries <-chan source.LogEntry) { + go func(source source.Source, entries <-chan core.LogEntry) { defer p.wg.Done() // Panic recovery to prevent single source from crashing pipeline diff --git a/src/internal/sink/console.go b/src/internal/sink/console.go index 6973b99..f311499 100644 --- a/src/internal/sink/console.go +++ b/src/internal/sink/console.go @@ -9,8 +9,8 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/source" "github.com/lixenwraith/log" ) @@ -23,7 +23,7 @@ type ConsoleConfig struct { // StdoutSink writes log entries to stdout type StdoutSink struct { - input chan source.LogEntry + input chan core.LogEntry config ConsoleConfig output io.Writer done chan struct{} @@ -53,7 +53,7 @@ func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format. } s := &StdoutSink{ - input: make(chan source.LogEntry, config.BufferSize), + input: make(chan core.LogEntry, config.BufferSize), config: config, output: os.Stdout, done: make(chan struct{}), @@ -66,7 +66,7 @@ func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format. return s, nil } -func (s *StdoutSink) Input() chan<- source.LogEntry { +func (s *StdoutSink) Input() chan<- core.LogEntry { return s.input } @@ -136,7 +136,7 @@ func (s *StdoutSink) processLoop(ctx context.Context) { // StderrSink writes log entries to stderr type StderrSink struct { - input chan source.LogEntry + input chan core.LogEntry config ConsoleConfig output io.Writer done chan struct{} @@ -166,7 +166,7 @@ func NewStderrSink(options map[string]any, logger *log.Logger, formatter format. } s := &StderrSink{ - input: make(chan source.LogEntry, config.BufferSize), + input: make(chan core.LogEntry, config.BufferSize), config: config, output: os.Stderr, done: make(chan struct{}), @@ -179,7 +179,7 @@ func NewStderrSink(options map[string]any, logger *log.Logger, formatter format. return s, nil } -func (s *StderrSink) Input() chan<- source.LogEntry { +func (s *StderrSink) Input() chan<- core.LogEntry { return s.input } diff --git a/src/internal/sink/file.go b/src/internal/sink/file.go index 2a07c33..0879399 100644 --- a/src/internal/sink/file.go +++ b/src/internal/sink/file.go @@ -7,15 +7,15 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/source" "github.com/lixenwraith/log" ) // FileSink writes log entries to files with rotation type FileSink struct { - input chan source.LogEntry + input chan core.LogEntry writer *log.Logger // Internal logger instance for file writing done chan struct{} startTime time.Time @@ -83,7 +83,7 @@ func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Fo } fs := &FileSink{ - input: make(chan source.LogEntry, bufferSize), + input: make(chan core.LogEntry, bufferSize), writer: writer, done: make(chan struct{}), startTime: time.Now(), @@ -95,7 +95,7 @@ func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Fo return fs, nil } -func (fs *FileSink) Input() chan<- source.LogEntry { +func (fs *FileSink) Input() chan<- core.LogEntry { return fs.input } diff --git a/src/internal/sink/http.go b/src/internal/sink/http.go index a3fbed3..dc1bea0 100644 --- a/src/internal/sink/http.go +++ b/src/internal/sink/http.go @@ -12,9 +12,9 @@ import ( "time" "logwisp/src/internal/config" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/netlimit" - "logwisp/src/internal/source" + "logwisp/src/internal/limit" "logwisp/src/internal/version" "github.com/lixenwraith/log" @@ -24,7 +24,7 @@ import ( // HTTPSink streams log entries via Server-Sent Events type HTTPSink struct { - input chan source.LogEntry + input chan core.LogEntry config HTTPConfig server *fasthttp.Server activeClients atomic.Int64 @@ -43,7 +43,7 @@ type HTTPSink struct { standalone bool // Net limiting - netLimiter *netlimit.Limiter + netLimiter *limit.NetLimiter // Statistics totalProcessed atomic.Uint64 @@ -126,7 +126,7 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo } h := &HTTPSink{ - input: make(chan source.LogEntry, cfg.BufferSize), + input: make(chan core.LogEntry, cfg.BufferSize), config: cfg, startTime: time.Now(), done: make(chan struct{}), @@ -140,18 +140,17 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo // Initialize net limiter if configured if cfg.NetLimit != nil && cfg.NetLimit.Enabled { - h.netLimiter = netlimit.New(*cfg.NetLimit, logger) + h.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger) } return h, nil } -func (h *HTTPSink) Input() chan<- source.LogEntry { +func (h *HTTPSink) Input() chan<- core.LogEntry { return h.input } func (h *HTTPSink) Start(ctx context.Context) error { - // TODO: use or remove unused ctx if !h.standalone { // In router mode, don't start our own server h.logger.Debug("msg", "HTTP sink in router mode, skipping server start", @@ -185,6 +184,16 @@ func (h *HTTPSink) Start(ctx context.Context) error { } }() + // Monitor context for shutdown signal + go func() { + <-ctx.Done() + if h.server != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + h.server.ShutdownWithContext(shutdownCtx) + } + }() + // Check if server started successfully select { case err := <-errChan: @@ -299,7 +308,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { ctx.Response.Header.Set("X-Accel-Buffering", "no") // Create subscription for this client - clientChan := make(chan source.LogEntry, h.config.BufferSize) + clientChan := make(chan core.LogEntry, h.config.BufferSize) clientDone := make(chan struct{}) // Subscribe to input channel @@ -415,7 +424,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { ctx.SetBodyStreamWriter(streamFunc) } -func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry source.LogEntry) error { +func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry core.LogEntry) error { formatted, err := h.formatter.Format(entry) if err != nil { return err @@ -434,7 +443,7 @@ func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry source.LogEntry) err return nil } -func (h *HTTPSink) createHeartbeatEntry() source.LogEntry { +func (h *HTTPSink) createHeartbeatEntry() core.LogEntry { message := "heartbeat" // Build fields for heartbeat metadata @@ -448,7 +457,7 @@ func (h *HTTPSink) createHeartbeatEntry() source.LogEntry { fieldsJSON, _ := json.Marshal(fields) - return source.LogEntry{ + return core.LogEntry{ Time: time.Now(), Source: "logwisp-http", Level: "INFO", diff --git a/src/internal/sink/http_client.go b/src/internal/sink/http_client.go index 5af8bb8..cde943b 100644 --- a/src/internal/sink/http_client.go +++ b/src/internal/sink/http_client.go @@ -10,8 +10,8 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/source" "github.com/lixenwraith/log" "github.com/valyala/fasthttp" @@ -19,10 +19,10 @@ import ( // HTTPClientSink forwards log entries to a remote HTTP endpoint type HTTPClientSink struct { - input chan source.LogEntry + input chan core.LogEntry config HTTPClientConfig client *fasthttp.Client - batch []source.LogEntry + batch []core.LogEntry batchMu sync.Mutex done chan struct{} wg sync.WaitGroup @@ -127,9 +127,9 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for } h := &HTTPClientSink{ - input: make(chan source.LogEntry, cfg.BufferSize), + input: make(chan core.LogEntry, cfg.BufferSize), config: cfg, - batch: make([]source.LogEntry, 0, cfg.BatchSize), + batch: make([]core.LogEntry, 0, cfg.BatchSize), done: make(chan struct{}), startTime: time.Now(), logger: logger, @@ -162,7 +162,7 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for return h, nil } -func (h *HTTPClientSink) Input() chan<- source.LogEntry { +func (h *HTTPClientSink) Input() chan<- core.LogEntry { return h.input } @@ -188,7 +188,7 @@ func (h *HTTPClientSink) Stop() { h.batchMu.Lock() if len(h.batch) > 0 { batch := h.batch - h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batch = make([]core.LogEntry, 0, h.config.BatchSize) h.batchMu.Unlock() h.sendBatch(batch) } else { @@ -246,7 +246,7 @@ func (h *HTTPClientSink) processLoop(ctx context.Context) { // Check if batch is full if int64(len(h.batch)) >= h.config.BatchSize { batch := h.batch - h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batch = make([]core.LogEntry, 0, h.config.BatchSize) h.batchMu.Unlock() // Send batch in background @@ -275,7 +275,7 @@ func (h *HTTPClientSink) batchTimer(ctx context.Context) { h.batchMu.Lock() if len(h.batch) > 0 { batch := h.batch - h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batch = make([]core.LogEntry, 0, h.config.BatchSize) h.batchMu.Unlock() // Send batch in background @@ -292,7 +292,7 @@ func (h *HTTPClientSink) batchTimer(ctx context.Context) { } } -func (h *HTTPClientSink) sendBatch(batch []source.LogEntry) { +func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) { h.activeConnections.Add(1) defer h.activeConnections.Add(-1) diff --git a/src/internal/sink/sink.go b/src/internal/sink/sink.go index bbc53fe..cb431f5 100644 --- a/src/internal/sink/sink.go +++ b/src/internal/sink/sink.go @@ -5,13 +5,13 @@ import ( "context" "time" - "logwisp/src/internal/source" + "logwisp/src/internal/core" ) // Sink represents an output destination for log entries type Sink interface { // Input returns the channel for sending log entries to this sink - Input() chan<- source.LogEntry + Input() chan<- core.LogEntry // Start begins processing log entries Start(ctx context.Context) error diff --git a/src/internal/sink/tcp.go b/src/internal/sink/tcp.go index 23ccd8f..e31fb0f 100644 --- a/src/internal/sink/tcp.go +++ b/src/internal/sink/tcp.go @@ -5,24 +5,24 @@ import ( "context" "encoding/json" "fmt" - "github.com/lixenwraith/log/compat" "net" "sync" "sync/atomic" "time" "logwisp/src/internal/config" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/netlimit" - "logwisp/src/internal/source" + "logwisp/src/internal/limit" "github.com/lixenwraith/log" + "github.com/lixenwraith/log/compat" "github.com/panjf2000/gnet/v2" ) // TCPSink streams log entries via TCP type TCPSink struct { - input chan source.LogEntry + input chan core.LogEntry config TCPConfig server *tcpServer done chan struct{} @@ -31,7 +31,7 @@ type TCPSink struct { engine *gnet.Engine engineMu sync.Mutex wg sync.WaitGroup - netLimiter *netlimit.Limiter + netLimiter *limit.NetLimiter logger *log.Logger formatter format.Formatter @@ -106,7 +106,7 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For } t := &TCPSink{ - input: make(chan source.LogEntry, cfg.BufferSize), + input: make(chan core.LogEntry, cfg.BufferSize), config: cfg, done: make(chan struct{}), startTime: time.Now(), @@ -116,13 +116,13 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For t.lastProcessed.Store(time.Time{}) if cfg.NetLimit != nil && cfg.NetLimit.Enabled { - t.netLimiter = netlimit.New(*cfg.NetLimit, logger) + t.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger) } return t, nil } -func (t *TCPSink) Input() chan<- source.LogEntry { +func (t *TCPSink) Input() chan<- core.LogEntry { return t.input } @@ -133,7 +133,7 @@ func (t *TCPSink) Start(ctx context.Context) error { t.wg.Add(1) go func() { defer t.wg.Done() - t.broadcastLoop() + t.broadcastLoop(ctx) }() // Configure gnet @@ -163,6 +163,18 @@ func (t *TCPSink) Start(ctx context.Context) error { errChan <- err }() + // Monitor context for shutdown + go func() { + <-ctx.Done() + t.engineMu.Lock() + if t.engine != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + (*t.engine).Stop(shutdownCtx) + } + t.engineMu.Unlock() + }() + // Wait briefly for server to start or fail select { case err := <-errChan: @@ -221,7 +233,7 @@ func (t *TCPSink) GetStats() SinkStats { } } -func (t *TCPSink) broadcastLoop() { +func (t *TCPSink) broadcastLoop(ctx context.Context) { var ticker *time.Ticker var tickerChan <-chan time.Time @@ -233,6 +245,8 @@ func (t *TCPSink) broadcastLoop() { for { select { + case <-ctx.Done(): + return case entry, ok := <-t.input: if !ok { return @@ -278,7 +292,7 @@ func (t *TCPSink) broadcastLoop() { } // Create heartbeat as a proper LogEntry -func (t *TCPSink) createHeartbeatEntry() source.LogEntry { +func (t *TCPSink) createHeartbeatEntry() core.LogEntry { message := "heartbeat" // Build fields for heartbeat metadata @@ -292,7 +306,7 @@ func (t *TCPSink) createHeartbeatEntry() source.LogEntry { fieldsJSON, _ := json.Marshal(fields) - return source.LogEntry{ + return core.LogEntry{ Time: time.Now(), Source: "logwisp-tcp", Level: "INFO", @@ -384,13 +398,4 @@ func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action { // We don't expect input from clients, just discard c.Discard(-1) return gnet.None -} - -// noopLogger implements gnet Logger interface but discards everything -// type noopLogger struct{} -// -// func (n noopLogger) Debugf(format string, args ...any) {} -// func (n noopLogger) Infof(format string, args ...any) {} -// func (n noopLogger) Warnf(format string, args ...any) {} -// func (n noopLogger) Errorf(format string, args ...any) {} -// func (n noopLogger) Fatalf(format string, args ...any) {} \ No newline at end of file +} \ No newline at end of file diff --git a/src/internal/sink/tcp_client.go b/src/internal/sink/tcp_client.go index 45a74a2..c94d11d 100644 --- a/src/internal/sink/tcp_client.go +++ b/src/internal/sink/tcp_client.go @@ -10,15 +10,15 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" "logwisp/src/internal/format" - "logwisp/src/internal/source" "github.com/lixenwraith/log" ) // TCPClientSink forwards log entries to a remote TCP endpoint type TCPClientSink struct { - input chan source.LogEntry + input chan core.LogEntry config TCPClientConfig conn net.Conn connMu sync.RWMutex @@ -104,7 +104,7 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form } t := &TCPClientSink{ - input: make(chan source.LogEntry, cfg.BufferSize), + input: make(chan core.LogEntry, cfg.BufferSize), config: cfg, done: make(chan struct{}), startTime: time.Now(), @@ -117,7 +117,7 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form return t, nil } -func (t *TCPClientSink) Input() chan<- source.LogEntry { +func (t *TCPClientSink) Input() chan<- core.LogEntry { return t.input } @@ -345,7 +345,7 @@ func (t *TCPClientSink) processLoop(ctx context.Context) { } } -func (t *TCPClientSink) sendEntry(entry source.LogEntry) error { +func (t *TCPClientSink) sendEntry(entry core.LogEntry) error { // Get current connection t.connMu.RLock() conn := t.conn diff --git a/src/internal/source/directory.go b/src/internal/source/directory.go index 005a81f..b3a6117 100644 --- a/src/internal/source/directory.go +++ b/src/internal/source/directory.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" + "github.com/lixenwraith/log" ) @@ -21,7 +23,7 @@ type DirectorySource struct { path string pattern string checkInterval time.Duration - subscribers []chan LogEntry + subscribers []chan core.LogEntry watchers map[string]*fileWatcher mu sync.RWMutex ctx context.Context @@ -69,11 +71,11 @@ func NewDirectorySource(options map[string]any, logger *log.Logger) (*DirectoryS return ds, nil } -func (ds *DirectorySource) Subscribe() <-chan LogEntry { +func (ds *DirectorySource) Subscribe() <-chan core.LogEntry { ds.mu.Lock() defer ds.mu.Unlock() - ch := make(chan LogEntry, 1000) + ch := make(chan core.LogEntry, 1000) ds.subscribers = append(ds.subscribers, ch) return ch } @@ -145,7 +147,7 @@ func (ds *DirectorySource) GetStats() SourceStats { } } -func (ds *DirectorySource) publish(entry LogEntry) { +func (ds *DirectorySource) publish(entry core.LogEntry) { ds.mu.RLock() defer ds.mu.RUnlock() diff --git a/src/internal/source/file_watcher.go b/src/internal/source/file_watcher.go index 1bcb6f8..f5ea4e7 100644 --- a/src/internal/source/file_watcher.go +++ b/src/internal/source/file_watcher.go @@ -15,6 +15,8 @@ import ( "syscall" "time" + "logwisp/src/internal/core" + "github.com/lixenwraith/log" ) @@ -31,7 +33,7 @@ type WatcherInfo struct { type fileWatcher struct { path string - callback func(LogEntry) + callback func(core.LogEntry) position int64 size int64 inode uint64 @@ -44,7 +46,7 @@ type fileWatcher struct { logger *log.Logger } -func newFileWatcher(path string, callback func(LogEntry), logger *log.Logger) *fileWatcher { +func newFileWatcher(path string, callback func(core.LogEntry), logger *log.Logger) *fileWatcher { w := &fileWatcher{ path: path, callback: callback, @@ -229,7 +231,7 @@ func (w *fileWatcher) checkFile() error { w.position = 0 // Reset position on rotation w.mu.Unlock() - w.callback(LogEntry{ + w.callback(core.LogEntry{ Time: time.Now(), Source: filepath.Base(w.path), Level: "INFO", @@ -309,7 +311,7 @@ func (w *fileWatcher) checkFile() error { return nil } -func (w *fileWatcher) parseLine(line string) LogEntry { +func (w *fileWatcher) parseLine(line string) core.LogEntry { var jsonLog struct { Time string `json:"time"` Level string `json:"level"` @@ -323,7 +325,7 @@ func (w *fileWatcher) parseLine(line string) LogEntry { timestamp = time.Now() } - return LogEntry{ + return core.LogEntry{ Time: timestamp, Source: filepath.Base(w.path), Level: jsonLog.Level, @@ -334,7 +336,7 @@ func (w *fileWatcher) parseLine(line string) LogEntry { level := extractLogLevel(line) - return LogEntry{ + return core.LogEntry{ Time: time.Now(), Source: filepath.Base(w.path), Level: level, diff --git a/src/internal/source/http.go b/src/internal/source/http.go index f63a1c5..8f03cfa 100644 --- a/src/internal/source/http.go +++ b/src/internal/source/http.go @@ -9,7 +9,8 @@ import ( "time" "logwisp/src/internal/config" - "logwisp/src/internal/netlimit" + "logwisp/src/internal/core" + "logwisp/src/internal/limit" "github.com/lixenwraith/log" "github.com/valyala/fasthttp" @@ -21,11 +22,11 @@ type HTTPSource struct { ingestPath string bufferSize int64 server *fasthttp.Server - subscribers []chan LogEntry + subscribers []chan core.LogEntry mu sync.RWMutex done chan struct{} wg sync.WaitGroup - netLimiter *netlimit.Limiter + netLimiter *limit.NetLimiter logger *log.Logger // Statistics @@ -89,18 +90,18 @@ func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, err cfg.MaxConnectionsPerIP = maxPerIP } - h.netLimiter = netlimit.New(cfg, logger) + h.netLimiter = limit.NewNetLimiter(cfg, logger) } } return h, nil } -func (h *HTTPSource) Subscribe() <-chan LogEntry { +func (h *HTTPSource) Subscribe() <-chan core.LogEntry { h.mu.Lock() defer h.mu.Unlock() - ch := make(chan LogEntry, h.bufferSize) + ch := make(chan core.LogEntry, h.bufferSize) h.subscribers = append(h.subscribers, ch) return ch } @@ -255,11 +256,11 @@ func (h *HTTPSource) requestHandler(ctx *fasthttp.RequestCtx) { }) } -func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { - var entries []LogEntry +func (h *HTTPSource) parseEntries(body []byte) ([]core.LogEntry, error) { + var entries []core.LogEntry // Try to parse as single JSON object first - var single LogEntry + var single core.LogEntry if err := json.Unmarshal(body, &single); err == nil { // Validate required fields if single.Message == "" { @@ -277,7 +278,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { } // Try to parse as JSON array - var array []LogEntry + var array []core.LogEntry if err := json.Unmarshal(body, &array); err == nil { // TODO: Placeholder; For array, divide total size by entry count as approximation approxSizePerEntry := int64(len(body) / len(array)) @@ -304,7 +305,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { continue } - var entry LogEntry + var entry core.LogEntry if err := json.Unmarshal(line, &entry); err != nil { return nil, fmt.Errorf("line %d: %w", i+1, err) } @@ -330,7 +331,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { return entries, nil } -func (h *HTTPSource) publish(entry LogEntry) bool { +func (h *HTTPSource) publish(entry core.LogEntry) bool { h.mu.RLock() defer h.mu.RUnlock() diff --git a/src/internal/source/source.go b/src/internal/source/source.go index d98d0c8..9b788ba 100644 --- a/src/internal/source/source.go +++ b/src/internal/source/source.go @@ -2,24 +2,15 @@ package source import ( - "encoding/json" "time" -) -// LogEntry represents a single log record -type LogEntry struct { - Time time.Time `json:"time"` - Source string `json:"source"` - Level string `json:"level,omitempty"` - Message string `json:"message"` - Fields json.RawMessage `json:"fields,omitempty"` - RawSize int64 `json:"-"` -} + "logwisp/src/internal/core" +) // Source represents an input data stream type Source interface { // Subscribe returns a channel that receives log entries - Subscribe() <-chan LogEntry + Subscribe() <-chan core.LogEntry // Start begins reading from the source Start() error diff --git a/src/internal/source/stdin.go b/src/internal/source/stdin.go index a4dbf8d..97dbe7a 100644 --- a/src/internal/source/stdin.go +++ b/src/internal/source/stdin.go @@ -7,12 +7,14 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/core" + "github.com/lixenwraith/log" ) // StdinSource reads log entries from standard input type StdinSource struct { - subscribers []chan LogEntry + subscribers []chan core.LogEntry done chan struct{} totalEntries atomic.Uint64 droppedEntries atomic.Uint64 @@ -32,8 +34,8 @@ func NewStdinSource(options map[string]any, logger *log.Logger) (*StdinSource, e return s, nil } -func (s *StdinSource) Subscribe() <-chan LogEntry { - ch := make(chan LogEntry, 1000) +func (s *StdinSource) Subscribe() <-chan core.LogEntry { + ch := make(chan core.LogEntry, 1000) s.subscribers = append(s.subscribers, ch) return ch } @@ -77,7 +79,7 @@ func (s *StdinSource) readLoop() { continue } - entry := LogEntry{ + entry := core.LogEntry{ Time: time.Now(), Source: "stdin", Message: line, @@ -96,7 +98,7 @@ func (s *StdinSource) readLoop() { } } -func (s *StdinSource) publish(entry LogEntry) { +func (s *StdinSource) publish(entry core.LogEntry) { s.totalEntries.Add(1) s.lastEntryTime.Store(entry.Time) diff --git a/src/internal/source/tcp.go b/src/internal/source/tcp.go index a157106..288b6cf 100644 --- a/src/internal/source/tcp.go +++ b/src/internal/source/tcp.go @@ -12,7 +12,8 @@ import ( "time" "logwisp/src/internal/config" - "logwisp/src/internal/netlimit" + "logwisp/src/internal/core" + "logwisp/src/internal/limit" "github.com/lixenwraith/log" "github.com/lixenwraith/log/compat" @@ -24,13 +25,13 @@ type TCPSource struct { port int64 bufferSize int64 server *tcpSourceServer - subscribers []chan LogEntry + subscribers []chan core.LogEntry mu sync.RWMutex done chan struct{} engine *gnet.Engine engineMu sync.Mutex wg sync.WaitGroup - netLimiter *netlimit.Limiter + netLimiter *limit.NetLimiter logger *log.Logger // Statistics @@ -86,18 +87,18 @@ func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error cfg.MaxTotalConnections = maxTotal } - t.netLimiter = netlimit.New(cfg, logger) + t.netLimiter = limit.NewNetLimiter(cfg, logger) } } return t, nil } -func (t *TCPSource) Subscribe() <-chan LogEntry { +func (t *TCPSource) Subscribe() <-chan core.LogEntry { t.mu.Lock() defer t.mu.Unlock() - ch := make(chan LogEntry, t.bufferSize) + ch := make(chan core.LogEntry, t.bufferSize) t.subscribers = append(t.subscribers, ch) return ch } @@ -205,7 +206,7 @@ func (t *TCPSource) GetStats() SourceStats { } } -func (t *TCPSource) publish(entry LogEntry) bool { +func (t *TCPSource) publish(entry core.LogEntry) bool { t.mu.RLock() defer t.mu.RUnlock() @@ -360,7 +361,7 @@ func (s *tcpSourceServer) OnTraffic(c gnet.Conn) gnet.Action { rawSize := int64(len(line)) // Parse JSON log entry - var entry LogEntry + var entry core.LogEntry if err := json.Unmarshal(line, &entry); err != nil { s.source.invalidEntries.Add(1) s.source.logger.Debug("msg", "Invalid JSON log entry",