diff --git a/src/internal/config/loader.go b/src/internal/config/loader.go index 851cbbc..2a1fd24 100644 --- a/src/internal/config/loader.go +++ b/src/internal/config/loader.go @@ -113,7 +113,18 @@ func defaults() *Config { Pipelines: []PipelineConfig{ { Name: "default_pipeline", - Flow: &FlowConfig{}, + Flow: &FlowConfig{ + RateLimit: &RateLimitConfig{ + Rate: 5, + Burst: 10, + Policy: "drop", + MaxEntrySizeBytes: 65536, + }, + Format: &FormatConfig{ + Type: "json", + SanitizerPolicy: "json", + }, + }, PluginSources: []PluginSourceConfig{ { ID: "default_source", diff --git a/src/internal/config/validate.go b/src/internal/config/validate.go new file mode 100644 index 0000000..c45a9b0 --- /dev/null +++ b/src/internal/config/validate.go @@ -0,0 +1,63 @@ +package config + +import ( + "fmt" + + lconfig "github.com/lixenwraith/config" +) + +// ValidateConfig validates top-level structure only +// Value range validation is delegated to component constructors +func ValidateConfig(cfg *Config) error { + if cfg == nil { + return fmt.Errorf("config is nil") + } + + if len(cfg.Pipelines) == 0 { + return fmt.Errorf("no pipelines configured") + } + + if err := validateLogConfig(cfg.Logging); err != nil { + return fmt.Errorf("logging: %w", err) + } + + for i, p := range cfg.Pipelines { + if err := lconfig.NonEmpty(p.Name); err != nil { + return fmt.Errorf("pipeline[%d].name: %w", i, err) + } + if len(p.PluginSources) == 0 { + return fmt.Errorf("pipeline[%d]: no sources defined", i) + } + if len(p.PluginSinks) == 0 { + return fmt.Errorf("pipeline[%d]: no sinks defined", i) + } + } + + return nil +} + +// validateLogConfig validates application logging settings +func validateLogConfig(cfg *LogConfig) error { + if cfg == nil { + return nil + } + + validateOutput := lconfig.OneOf("file", "stdout", "stderr", "split", "all", "none") + if err := validateOutput(cfg.Output); err != nil { + return fmt.Errorf("output: %w", err) + } + + validateLevel := lconfig.OneOf("debug", "info", "warn", "error") + if err := validateLevel(cfg.Level); err != nil { + return fmt.Errorf("level: %w", err) + } + + if cfg.Console != nil { + validateTarget := lconfig.OneOf("stdout", "stderr", "split") + if err := validateTarget(cfg.Console.Target); err != nil { + return fmt.Errorf("console.target: %w", err) + } + } + + return nil +} \ No newline at end of file diff --git a/src/internal/config/validation.go b/src/internal/config/validation.go deleted file mode 100644 index 8e1b724..0000000 --- a/src/internal/config/validation.go +++ /dev/null @@ -1,234 +0,0 @@ -package config - -import ( - "fmt" - "path/filepath" - "regexp" - "strings" - - lconfig "github.com/lixenwraith/config" -) - -// ValidateConfig is the centralized validator for the entire configuration structure -func ValidateConfig(cfg *Config) error { - if cfg == nil { - return fmt.Errorf("config is nil") - } - - if len(cfg.Pipelines) == 0 { - return fmt.Errorf("no pipelines configured") - } - - if err := validateLogConfig(cfg.Logging); err != nil { - return fmt.Errorf("logging config: %w", err) - } - - // // Track used ports across all pipelines - // allPorts := make(map[int64]string) - // pipelineNames := make(map[string]bool) - - // for i, pipeline := range cfg.Pipelines { - // if err := validatePipeline(i, &pipeline, pipelineNames, allPorts); err != nil { - // return err - // } - // } - - return nil -} - -// validateLogConfig validates the application's own logging settings -func validateLogConfig(cfg *LogConfig) error { - validOutputs := map[string]bool{ - "file": true, "stdout": true, "stderr": true, - "split": true, "all": true, "none": true, - } - if !validOutputs[cfg.Output] { - return fmt.Errorf("invalid log output mode: %s", cfg.Output) - } - - validLevels := map[string]bool{ - "debug": true, "info": true, "warn": true, "error": true, - } - if !validLevels[cfg.Level] { - return fmt.Errorf("invalid log level: %s", cfg.Level) - } - - if cfg.Console != nil { - validTargets := map[string]bool{ - "stdout": true, "stderr": true, "split": true, - } - if !validTargets[cfg.Console.Target] { - return fmt.Errorf("invalid console target: %s", cfg.Console.Target) - } - } - - return nil -} - -// validateRateLimit validates the pipeline-level rate limit settings -func validateRateLimit(pipelineName string, cfg *RateLimitConfig) error { - if cfg == nil { - return nil - } - - if cfg.Rate < 0 { - return fmt.Errorf("pipeline '%s': rate limit rate cannot be negative", pipelineName) - } - - if cfg.Burst < 0 { - return fmt.Errorf("pipeline '%s': rate limit burst cannot be negative", pipelineName) - } - - if cfg.MaxEntrySizeBytes < 0 { - return fmt.Errorf("pipeline '%s': max entry size bytes cannot be negative", pipelineName) - } - - // Validate policy - switch strings.ToLower(cfg.Policy) { - case "", "pass", "drop": - // Valid policies - default: - return fmt.Errorf("pipeline '%s': invalid rate limit policy '%s' (must be 'pass' or 'drop')", - pipelineName, cfg.Policy) - } - - return nil -} - -// validateFilter validates a single filter's configuration -func validateFilter(pipelineName string, filterIndex int, cfg *FilterConfig) error { - // Validate filter type - switch cfg.Type { - case FilterTypeInclude, FilterTypeExclude, "": - // Valid types - default: - return fmt.Errorf("pipeline '%s' filter[%d]: invalid type '%s' (must be 'include' or 'exclude')", - pipelineName, filterIndex, cfg.Type) - } - - // Validate filter logic - switch cfg.Logic { - case FilterLogicOr, FilterLogicAnd, "": - // Valid logic - default: - return fmt.Errorf("pipeline '%s' filter[%d]: invalid logic '%s' (must be 'or' or 'and')", - pipelineName, filterIndex, cfg.Logic) - } - - // Empty patterns is valid - passes everything - if len(cfg.Patterns) == 0 { - return nil - } - - // Validate regex patterns - for i, pattern := range cfg.Patterns { - if _, err := regexp.Compile(pattern); err != nil { - return fmt.Errorf("pipeline '%s' filter[%d] pattern[%d] '%s': invalid regex: %w", - pipelineName, filterIndex, i, pattern, err) - } - } - - return nil -} - -// validateFileSource validates the settings for a directory source -func validateFileSource(pipelineName string, index int, opts *FileSourceOptions) error { - if err := lconfig.NonEmpty(opts.Directory); err != nil { - return fmt.Errorf("pipeline '%s' source[%d]: directory requires 'path'", pipelineName, index) - } else { - absPath, err := filepath.Abs(opts.Directory) - if err != nil { - return fmt.Errorf("invalid path %s: %w", opts.Directory, err) - } - opts.Directory = absPath - } - - // Check for directory traversal - if strings.Contains(opts.Directory, "..") { - return fmt.Errorf("pipeline '%s' source[%d]: path contains directory traversal", pipelineName, index) - } - - // Validate pattern if provided - if opts.Pattern != "" { - if strings.Count(opts.Pattern, "*") == 0 && strings.Count(opts.Pattern, "?") == 0 { - // If no wildcards, ensure valid filename - if filepath.Base(opts.Pattern) != opts.Pattern { - return fmt.Errorf("pipeline '%s' source[%d]: pattern contains path separators", pipelineName, index) - } - } - } else { - opts.Pattern = "*" - } - - // Validate check interval - if opts.CheckIntervalMS < 10 { - return fmt.Errorf("pipeline '%s' source[%d]: check_interval_ms must be at least 10ms", pipelineName, index) - } - - return nil -} - -// validateConsoleSource validates the settings for a console source -func validateConsoleSource(pipelineName string, index int, opts *ConsoleSourceOptions) error { - if opts.BufferSize < 0 { - return fmt.Errorf("pipeline '%s' source[%d]: buffer_size must be positive", pipelineName, index) - } else if opts.BufferSize == 0 { - opts.BufferSize = 1000 - } - return nil -} - -// validateConsoleSink validates the settings for a console sink -func validateConsoleSink(pipelineName string, index int, opts *ConsoleSinkOptions) error { - if opts.BufferSize < 1 { - return fmt.Errorf("pipeline '%s' sink[%d]: buffer_size must be positive", pipelineName, index) - } - return nil -} - -// validateFileSink validates the settings for a file sink -func validateFileSink(pipelineName string, index int, opts *FileSinkOptions) error { - if err := lconfig.NonEmpty(opts.Directory); err != nil { - return fmt.Errorf("pipeline '%s' sink[%d]: file requires 'directory'", pipelineName, index) - } - - if err := lconfig.NonEmpty(opts.Name); err != nil { - return fmt.Errorf("pipeline '%s' sink[%d]: file requires 'name'", pipelineName, index) - } - - if opts.BufferSize <= 0 { - return fmt.Errorf("pipeline '%s' sink[%d]: max_size_mb must be positive", pipelineName, index) - } - - // Validate sizes - if opts.MaxSizeMB < 0 { - return fmt.Errorf("pipeline '%s' sink[%d]: max_size_mb must be positive", pipelineName, index) - } - - if opts.MaxTotalSizeMB <= 0 { - return fmt.Errorf("pipeline '%s' sink[%d]: max_total_size_mb cannot be negative", pipelineName, index) - } - - if opts.MinDiskFreeMB < 0 { - return fmt.Errorf("pipeline '%s' sink[%d]: min_disk_free_mb must be positive", pipelineName, index) - } - - if opts.RetentionHours <= 0 { - return fmt.Errorf("pipeline '%s' sink[%d]: retention_hours cannot be negative", pipelineName, index) - } - - return nil -} - -// validateHeartbeat validates nested HeartbeatConfig settings -func validateHeartbeat(pipelineName, location string, hb *HeartbeatConfig) error { - if !hb.Enabled { - return nil // Skip validation if disabled - } - - if hb.IntervalMS < 1000 { // At least 1 second - return fmt.Errorf("pipeline '%s' %s: heartbeat interval must be at least 1000ms", pipelineName, location) - } - - return nil -} \ No newline at end of file diff --git a/src/internal/filter/filter.go b/src/internal/filter/filter.go index db6b418..8c9da39 100644 --- a/src/internal/filter/filter.go +++ b/src/internal/filter/filter.go @@ -9,6 +9,7 @@ import ( "logwisp/src/internal/config" "logwisp/src/internal/core" + lconfig "github.com/lixenwraith/config" "github.com/lixenwraith/log" ) @@ -27,6 +28,20 @@ type Filter struct { // NewFilter creates a new filter from a configuration func NewFilter(cfg config.FilterConfig, logger *log.Logger) (*Filter, error) { + // Validate enums before setting defaults + if cfg.Type != "" { + validateType := lconfig.OneOf(config.FilterTypeInclude, config.FilterTypeExclude) + if err := validateType(cfg.Type); err != nil { + return nil, fmt.Errorf("type: %w", err) + } + } + if cfg.Logic != "" { + validateLogic := lconfig.OneOf(config.FilterLogicOr, config.FilterLogicAnd) + if err := validateLogic(cfg.Logic); err != nil { + return nil, fmt.Errorf("logic: %w", err) + } + } + // Set defaults if cfg.Type == "" { cfg.Type = config.FilterTypeInclude @@ -45,7 +60,7 @@ func NewFilter(cfg config.FilterConfig, logger *log.Logger) (*Filter, error) { for i, pattern := range cfg.Patterns { re, err := regexp.Compile(pattern) if err != nil { - return nil, fmt.Errorf("invalid regex pattern[%d] '%s': %w", i, pattern, err) + return nil, fmt.Errorf("pattern[%d] '%s': %w", i, pattern, err) } f.patterns = append(f.patterns, re) } diff --git a/src/internal/flow/flow.go b/src/internal/flow/flow.go index cbee733..b97fa59 100644 --- a/src/internal/flow/flow.go +++ b/src/internal/flow/flow.go @@ -63,9 +63,13 @@ func NewFlow(cfg *config.FlowConfig, logger *log.Logger) (*Flow, error) { } f.formatter = formatter - // Create heartbeat generator with the same formatter - if cfg.Heartbeat != nil && cfg.Heartbeat.Enabled { - f.heartbeat = NewHeartbeatGenerator(cfg.Heartbeat, formatter, logger) + // Create heartbeat generator with the same formatter if configured + if cfg.Heartbeat != nil { + hb, err := NewHeartbeatGenerator(cfg.Heartbeat, formatter, logger) + if err != nil { + return nil, fmt.Errorf("heartbeat: %w", err) + } + f.heartbeat = hb } logger.Info("msg", "Flow processor created", diff --git a/src/internal/flow/heartbeat.go b/src/internal/flow/heartbeat.go index edc54b4..7d34fbf 100644 --- a/src/internal/flow/heartbeat.go +++ b/src/internal/flow/heartbeat.go @@ -3,17 +3,25 @@ package flow import ( "context" "encoding/json" - "logwisp/src/internal/format" + "fmt" "sync/atomic" "time" "logwisp/src/internal/config" "logwisp/src/internal/core" + "logwisp/src/internal/format" + lconfig "github.com/lixenwraith/config" "github.com/lixenwraith/log" "github.com/lixenwraith/log/formatter" ) +const ( + MinHeartbeatIntervalMS = 100 + DefaultHeartbeatIntervalMS = 1000 + DefaultHeartbeatFormat = "txt" +) + // HeartbeatGenerator produces periodic heartbeat events type HeartbeatGenerator struct { config *config.HeartbeatConfig @@ -24,14 +32,35 @@ type HeartbeatGenerator struct { } // NewHeartbeatGenerator creates a new heartbeat generator -func NewHeartbeatGenerator(cfg *config.HeartbeatConfig, formatter format.Formatter, logger *log.Logger) *HeartbeatGenerator { +func NewHeartbeatGenerator(cfg *config.HeartbeatConfig, formatter format.Formatter, logger *log.Logger) (*HeartbeatGenerator, error) { + if cfg == nil || !cfg.Enabled { + return nil, nil + } + + // Validate + if cfg.IntervalMS == 0 { + cfg.IntervalMS = DefaultHeartbeatIntervalMS + } else if cfg.IntervalMS < MinHeartbeatIntervalMS { + return nil, fmt.Errorf("interval_ms: must be >= %d, got %d", MinHeartbeatIntervalMS, cfg.IntervalMS) + } + + validateFormat := lconfig.OneOf("txt", "json", "raw", "") + if err := validateFormat(cfg.Format); err != nil { + return nil, fmt.Errorf("format: %w", err) + } + + // Defaults + if cfg.Format == "" { + cfg.Format = DefaultHeartbeatFormat + } + hg := &HeartbeatGenerator{ config: cfg, formatter: formatter, logger: logger, } hg.lastBeat.Store(time.Time{}) - return hg + return hg, nil } // Start begins generating heartbeat events diff --git a/src/internal/flow/ratelimiter.go b/src/internal/flow/ratelimiter.go index 47f3021..9e56778 100644 --- a/src/internal/flow/ratelimiter.go +++ b/src/internal/flow/ratelimiter.go @@ -1,6 +1,7 @@ package flow import ( + "fmt" "strings" "sync/atomic" @@ -8,6 +9,7 @@ import ( "logwisp/src/internal/core" "logwisp/src/internal/tokenbucket" + lconfig "github.com/lixenwraith/config" "github.com/lixenwraith/log" ) @@ -25,21 +27,36 @@ type RateLimiter struct { // NewRateLimiter creates a new pipeline-level rate limiter from configuration func NewRateLimiter(cfg config.RateLimitConfig, logger *log.Logger) (*RateLimiter, error) { + // Rate <= 0 means disabled if cfg.Rate <= 0 { return nil, nil // No rate limit } + // Validate + if err := lconfig.NonNegative(cfg.Rate); err != nil { + return nil, fmt.Errorf("rate: %w", err) + } + if err := lconfig.NonNegative(cfg.Burst); err != nil { + return nil, fmt.Errorf("burst: %w", err) + } + if err := lconfig.NonNegative(cfg.MaxEntrySizeBytes); err != nil { + return nil, fmt.Errorf("max_entry_size_bytes: %w", err) + } + + // Defaults burst := cfg.Burst if burst <= 0 { - burst = cfg.Rate // Default burst to rate + burst = cfg.Rate } var policy config.RateLimitPolicy switch strings.ToLower(cfg.Policy) { case "drop": policy = config.PolicyDrop - default: + case "pass", "": policy = config.PolicyPass + default: + return nil, fmt.Errorf("policy: must be one of [drop, pass], got %s", cfg.Policy) } l := &RateLimiter{ diff --git a/src/internal/format/adapter.go b/src/internal/format/adapter.go index 48f332b..373c90e 100644 --- a/src/internal/format/adapter.go +++ b/src/internal/format/adapter.go @@ -2,13 +2,20 @@ package format import ( "encoding/json" + "fmt" + "logwisp/src/internal/config" "logwisp/src/internal/core" + lconfig "github.com/lixenwraith/config" "github.com/lixenwraith/log/formatter" "github.com/lixenwraith/log/sanitizer" ) +const ( + DefaultFormatType = "raw" +) + // FormatterAdapter wraps log/formatter for logwisp compatibility type FormatterAdapter struct { formatter *formatter.Formatter @@ -18,6 +25,26 @@ type FormatterAdapter struct { // NewFormatterAdapter creates adapter from config func NewFormatterAdapter(cfg *config.FormatConfig) (*FormatterAdapter, error) { + // Validate + if cfg.Type != "" { + validateType := lconfig.OneOf("json", "txt", "text", "raw") + if err := validateType(cfg.Type); err != nil { + return nil, fmt.Errorf("type: %w", err) + } + } + + if cfg.SanitizerPolicy != "" { + validatePolicy := lconfig.OneOf("raw", "json", "txt", "shell") + if err := validatePolicy(cfg.SanitizerPolicy); err != nil { + return nil, fmt.Errorf("sanitizer_policy: %w", err) + } + } + + // Defaults + if cfg.Type == "" { + cfg.Type = DefaultFormatType + } + // Create sanitizer based on policy var s *sanitizer.Sanitizer if cfg.SanitizerPolicy != "" { @@ -44,7 +71,6 @@ func NewFormatterAdapter(cfg *config.FormatConfig) (*FormatterAdapter, error) { // Build flags from config flags := cfg.Flags if flags == 0 { - // Set default flags based on format type if cfg.Type == "raw" { flags = formatter.FlagRaw } else { diff --git a/src/internal/format/format.go b/src/internal/format/format.go index 956ee9a..6a3ea81 100644 --- a/src/internal/format/format.go +++ b/src/internal/format/format.go @@ -14,17 +14,15 @@ type Formatter interface { Name() string } -// NewFormatter creates a Formatter using the new formatter/sanitizer packages +// NewFormatter creates a Formatter using formatter/sanitizer packages func NewFormatter(cfg *config.FormatConfig) (Formatter, error) { if cfg == nil { - // Default config cfg = &config.FormatConfig{ - Type: "raw", + Type: DefaultFormatType, Flags: 0, SanitizerPolicy: "raw", } } - // Use the new FormatterAdapter that integrates formatter and sanitizer return NewFormatterAdapter(cfg) } \ No newline at end of file diff --git a/src/internal/sink/console/console.go b/src/internal/sink/console/console.go index 97105b6..b03ecba 100644 --- a/src/internal/sink/console/console.go +++ b/src/internal/sink/console/console.go @@ -49,6 +49,12 @@ type ConsoleSink struct { lastProcessed atomic.Value // time.Time } +const ( + // Defaults + DefaultConsoleTarget = "stdout" + DefaultConsoleBufferSize = 1000 +) + // NewConsoleSinkPlugin creates a console sink through plugin factory func NewConsoleSinkPlugin( id string, @@ -56,10 +62,7 @@ func NewConsoleSinkPlugin( logger *log.Logger, proxy *session.Proxy, ) (sink.Sink, error) { - // Create empty config struct with defaults - opts := &config.ConsoleSinkOptions{ - Target: DefaultConsoleTarget, - } + opts := &config.ConsoleSinkOptions{} // Scan config map into struct if err := lconfig.ScanMap(configMap, opts); err != nil { @@ -67,21 +70,28 @@ func NewConsoleSinkPlugin( } // Validate and apply defaults + if opts.Target == "" { + opts.Target = DefaultConsoleTarget + } else { + validateTarget := lconfig.OneOf("stdout", "stderr") + if err := validateTarget(opts.Target); err != nil { + return nil, fmt.Errorf("target: %w", err) + } + } + var output io.Writer switch opts.Target { case "stdout": output = os.Stdout case "stderr": output = os.Stderr - default: - return nil, fmt.Errorf("invalid console target: %s (must be 'stdout' or 'stderr')", opts.Target) } if opts.BufferSize <= 0 { opts.BufferSize = DefaultConsoleBufferSize } - // Step 4: Create and return plugin instance + // Create and return plugin instance cs := &ConsoleSink{ id: id, proxy: proxy, diff --git a/src/internal/sink/console/const.go b/src/internal/sink/console/const.go deleted file mode 100644 index a1e2fbc..0000000 --- a/src/internal/sink/console/const.go +++ /dev/null @@ -1,7 +0,0 @@ -package console - -const ( - // Defaults - DefaultConsoleTarget = "stdout" - DefaultConsoleBufferSize = 1000 -) \ No newline at end of file diff --git a/src/internal/sink/file/const.go b/src/internal/sink/file/const.go deleted file mode 100644 index a6cb39b..0000000 --- a/src/internal/sink/file/const.go +++ /dev/null @@ -1,11 +0,0 @@ -package file - -const ( - // Defaults - DefaultFileMaxSizeMB = 100 - DefaultFileMaxTotalSizeMB = 1000 - DefaultFileMinDiskFreeMB = 100 - DefaultFileRetentionHours = 168 // 7 days - DefaultFileBufferSize = 1000 - DefaultFileFlushIntervalMs = 100 -) \ No newline at end of file diff --git a/src/internal/sink/file/file.go b/src/internal/sink/file/file.go index 0bdef24..088778d 100644 --- a/src/internal/sink/file/file.go +++ b/src/internal/sink/file/file.go @@ -47,6 +47,16 @@ type FileSink struct { lastProcessed atomic.Value // time.Time } +const ( + // Defaults + DefaultFileMaxSizeMB = 100 + DefaultFileMaxTotalSizeMB = 1000 + DefaultFileMinDiskFreeMB = 100 + DefaultFileRetentionHours = 168 // 7 days + DefaultFileBufferSize = 1000 + DefaultFileFlushIntervalMs = 100 +) + // NewFileSinkPlugin creates a file sink through plugin factory func NewFileSinkPlugin( id string, @@ -62,13 +72,15 @@ func NewFileSinkPlugin( return nil, fmt.Errorf("failed to parse config: %w", err) } - // Validate required fields and apply defaults - if opts.Directory == "" { - return nil, fmt.Errorf("directory is required") + // Validate + if err := lconfig.NonEmpty(opts.Directory); err != nil { + return nil, fmt.Errorf("directory: %w", err) } - if opts.Name == "" { - return nil, fmt.Errorf("name is required") + if err := lconfig.NonEmpty(opts.Name); err != nil { + return nil, fmt.Errorf("name: %w", err) } + + // Defaults if opts.MaxSizeMB <= 0 { opts.MaxSizeMB = DefaultFileMaxSizeMB } @@ -88,7 +100,6 @@ func NewFileSinkPlugin( opts.FlushIntervalMs = DefaultFileFlushIntervalMs } - // Step 4: Create and return plugin instance // Create configuration for the internal log writer writerConfig := log.DefaultConfig() writerConfig.Directory = opts.Directory diff --git a/src/internal/sink/http/const.go b/src/internal/sink/http/const.go deleted file mode 100644 index 4eb68e2..0000000 --- a/src/internal/sink/http/const.go +++ /dev/null @@ -1,16 +0,0 @@ -package http - -import "time" - -const ( - // Server lifecycle - HttpServerStartTimeout = 100 * time.Millisecond - HttpServerShutdownTimeout = 2 * time.Second - - // Defaults - DefaultHTTPHost = "0.0.0.0" - DefaultHTTPBufferSize = 1000 - DefaultHTTPStreamPath = "/stream" - DefaultHTTPStatusPath = "/status" - HTTPMaxPort = 65535 -) \ No newline at end of file diff --git a/src/internal/sink/http/http.go b/src/internal/sink/http/http.go index 91b6f74..ba05954 100644 --- a/src/internal/sink/http/http.go +++ b/src/internal/sink/http/http.go @@ -66,6 +66,19 @@ type HTTPSink struct { lastProcessed atomic.Value // time.Time } +const ( + // Server lifecycle + HttpServerStartTimeout = 100 * time.Millisecond + HttpServerShutdownTimeout = 2 * time.Second + + // Defaults + DefaultHTTPHost = "0.0.0.0" + DefaultHTTPBufferSize = 1000 + DefaultHTTPStreamPath = "/stream" + DefaultHTTPStatusPath = "/status" + HTTPMaxPort = 65535 +) + // NewHTTPSinkPlugin creates an HTTP sink through plugin factory func NewHTTPSinkPlugin( id string, @@ -83,10 +96,12 @@ func NewHTTPSinkPlugin( return nil, fmt.Errorf("failed to parse config: %w", err) } - // Validate and apply defaults + // Validate if opts.Port <= 0 || opts.Port > HTTPMaxPort { return nil, fmt.Errorf("port must be between 1 and %d", HTTPMaxPort) } + + // Defaults if opts.BufferSize <= 0 { opts.BufferSize = DefaultHTTPBufferSize } diff --git a/src/internal/sink/tcp/const.go b/src/internal/sink/tcp/const.go deleted file mode 100644 index 4d49712..0000000 --- a/src/internal/sink/tcp/const.go +++ /dev/null @@ -1,21 +0,0 @@ -package tcp - -import ( - "time" -) - -const ( - // Server lifecycle - TCPServerStartTimeout = 100 * time.Millisecond - TCPServerShutdownTimeout = 2 * time.Second - - // Connection management - TCPMaxConsecutiveWriteErrors = 3 - TCPMaxPort = 65535 - - // Defaults - DefaultTCPHost = "0.0.0.0" - DefaultTCPBufferSize = 1000 - DefaultTCPWriteTimeoutMS = 5000 - DefaultTCPKeepAlivePeriod = 30000 -) \ No newline at end of file diff --git a/src/internal/sink/tcp/tcp.go b/src/internal/sink/tcp/tcp.go index c890ea8..9436399 100644 --- a/src/internal/sink/tcp/tcp.go +++ b/src/internal/sink/tcp/tcp.go @@ -61,6 +61,22 @@ type TCPSink struct { errorMu sync.Mutex } +const ( + // Server lifecycle + TCPServerStartTimeout = 100 * time.Millisecond + TCPServerShutdownTimeout = 2 * time.Second + + // Connection management + TCPMaxConsecutiveWriteErrors = 3 + TCPMaxPort = 65535 + + // Defaults + DefaultTCPHost = "0.0.0.0" + DefaultTCPBufferSize = 1000 + DefaultTCPWriteTimeoutMS = 5000 + DefaultTCPKeepAlivePeriod = 30000 +) + // NewTCPSinkPlugin creates a TCP sink through plugin factory func NewTCPSinkPlugin( id string, @@ -80,11 +96,12 @@ func NewTCPSinkPlugin( return nil, fmt.Errorf("failed to parse config: %w", err) } - // Validate required fields - // Validate and apply defaults - if opts.Port <= 0 || opts.Port > TCPMaxPort { - return nil, fmt.Errorf("port must be between 1 and %d", TCPMaxPort) + // Validate + if err := lconfig.Port(opts.Port); err != nil { + return nil, fmt.Errorf("port: %w", err) } + + // Defaults if opts.BufferSize <= 0 { opts.BufferSize = DefaultTCPBufferSize } diff --git a/src/internal/source/console/console.go b/src/internal/source/console/console.go index cc2e659..1a43cd5 100644 --- a/src/internal/source/console/console.go +++ b/src/internal/source/console/console.go @@ -56,6 +56,10 @@ type ConsoleSource struct { lastEntryTime atomic.Value // time.Time } +const ( + DefaultConsoleSourceBufferSize = 1000 +) + // NewConsoleSourcePlugin creates a console source through plugin factory func NewConsoleSourcePlugin( id string, @@ -63,22 +67,19 @@ func NewConsoleSourcePlugin( logger *log.Logger, proxy *session.Proxy, ) (source.Source, error) { - // Step 1: Create empty config struct with defaults - opts := &config.ConsoleSourceOptions{ - BufferSize: 1000, // Default buffer size - } + opts := &config.ConsoleSourceOptions{} - // Step 2: Use lconfig to scan map into struct (overriding defaults) + // Scan config map if err := lconfig.ScanMap(configMap, opts); err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } - // Step 3: Validate required fields + // Validate and apply defaults if opts.BufferSize <= 0 { - opts.BufferSize = 1000 + opts.BufferSize = DefaultConsoleSourceBufferSize } - // Step 4: Create and return plugin instance + // Create and return plugin instance cs := &ConsoleSource{ id: id, proxy: proxy, @@ -89,7 +90,7 @@ func NewConsoleSourcePlugin( } cs.lastEntryTime.Store(time.Time{}) - // Create session for console + // Create session cs.session = proxy.CreateSession( "console_stdin", map[string]any{ diff --git a/src/internal/source/console/const.go b/src/internal/source/console/const.go deleted file mode 100644 index 178a9d5..0000000 --- a/src/internal/source/console/const.go +++ /dev/null @@ -1 +0,0 @@ -package console \ No newline at end of file diff --git a/src/internal/source/file/file.go b/src/internal/source/file/file.go index c02c7bb..aa5f69f 100644 --- a/src/internal/source/file/file.go +++ b/src/internal/source/file/file.go @@ -57,6 +57,12 @@ type FileSource struct { lastEntryTime atomic.Value // time.Time } +const ( + DefaultFileSourcePattern = "*" + DefaultFileSourceCheckIntervalMS = 100 + MinFileSourceCheckIntervalMS = 10 +) + // NewFileSourcePlugin creates a file source through plugin factory func NewFileSourcePlugin( id string, @@ -64,28 +70,28 @@ func NewFileSourcePlugin( logger *log.Logger, proxy *session.Proxy, ) (source.Source, error) { - // Step 1: Create empty config struct with defaults - opts := &config.FileSourceOptions{ - Directory: "", // Required field - no default - Pattern: "*", // Default pattern - CheckIntervalMS: 100, // Default check interval - } + opts := &config.FileSourceOptions{} - // Step 2: Use lconfig to scan map into struct (overriding defaults) + // Use lconfig to scan map into struct (overriding defaults) if err := lconfig.ScanMap(configMap, opts); err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } - // Step 3: Validate required fields - if opts.Directory == "" { - return nil, fmt.Errorf("directory is mandatory") + // Validate and apply defaults + if err := lconfig.NonEmpty(opts.Directory); err != nil { + return nil, fmt.Errorf("directory: %w", err) } - if opts.CheckIntervalMS < 10 { - return nil, fmt.Errorf("check_interval_ms must be at least 10ms") + if opts.Pattern == "" { + opts.Pattern = DefaultFileSourcePattern + } + if opts.CheckIntervalMS <= 0 { + opts.CheckIntervalMS = DefaultFileSourceCheckIntervalMS + } else if opts.CheckIntervalMS < MinFileSourceCheckIntervalMS { + return nil, fmt.Errorf("check_interval_ms: must be >= %d", MinFileSourceCheckIntervalMS) } - // Step 4: Create and return plugin instance + // Create and return plugin instance fs := &FileSource{ id: id, proxy: proxy, diff --git a/src/internal/source/random/random.go b/src/internal/source/random/random.go index 594898a..2dd1850 100644 --- a/src/internal/source/random/random.go +++ b/src/internal/source/random/random.go @@ -53,6 +53,12 @@ type RandomSource struct { lastEntryTime atomic.Value // time.Time } +const ( + DefaultRandomSourceIntervalMS = 500 + DefaultRandomSourceFormat = "txt" + DefaultRandomSourceLength = 20 +) + // NewRandomSourcePlugin creates a random source through plugin factory func NewRandomSourcePlugin( id string, @@ -69,26 +75,33 @@ func NewRandomSourcePlugin( Special: false, } - // Step 2: Use lconfig to scan map into struct (overriding defaults) + // Scan config map if err := lconfig.ScanMap(configMap, opts); err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } - // Step 3: Validate + // Defaults if opts.IntervalMS <= 0 { - return nil, fmt.Errorf("interval_ms must be positive") + opts.IntervalMS = DefaultRandomSourceIntervalMS } + if opts.Format == "" { + opts.Format = DefaultRandomSourceFormat + } + if opts.Length <= 0 { + opts.Length = DefaultRandomSourceLength + } + + // Validate if opts.JitterMS < 0 { return nil, fmt.Errorf("jitter_ms cannot be negative") } if opts.JitterMS > opts.IntervalMS { opts.JitterMS = opts.IntervalMS } - if opts.Length <= 0 { - return nil, fmt.Errorf("length must be positive") - } - if opts.Format != "raw" && opts.Format != "txt" && opts.Format != "json" { - return nil, fmt.Errorf("format must be 'raw', 'txt', or 'json'") + + validateFormat := lconfig.OneOf("raw", "txt", "json") + if err := validateFormat(opts.Format); err != nil { + return nil, fmt.Errorf("format: %w", err) } rs := &RandomSource{