v0.3.6 number types refactored to 64-bit matching config types, bundled config samples updated

This commit is contained in:
2025-07-19 02:23:56 -04:00
parent e88812bb09
commit 7c221b426b
29 changed files with 400 additions and 484 deletions

View File

@ -1,221 +1,244 @@
# LogWisp Default Configuration and Guide # LogWisp Configuration Reference
# Default path: ~/.config/logwisp.toml # Default location: ~/.config/logwisp/logwisp.toml
# Override: logwisp --config /path/to/config.toml # Override: logwisp --config /path/to/config.toml
#
# All values shown are defaults unless marked (required)
# ============================================================================
# GLOBAL OPTIONS
# ============================================================================
# router = false # Enable router mode (multi-pipeline HTTP routing)
# background = false # Run as background daemon
# quiet = false # Suppress all output
# disable_status_reporter = false # Disable periodic status logging
# ============================================================================ # ============================================================================
# LOGGING (LogWisp's operational logs) # LOGGING (LogWisp's operational logs)
# ============================================================================ # ============================================================================
[logging] [logging]
# Output mode: file, stdout, stderr, both, none output = "stderr" # file, stdout, stderr, both, none
output = "stderr" level = "info" # debug, info, warn, error
# Log level: debug, info, warn, error
level = "info"
# File output settings (when output includes "file")
[logging.file] [logging.file]
directory = "./logs" directory = "./logs" # Log file directory
name = "logwisp" name = "logwisp" # Base filename
max_size_mb = 100 max_size_mb = 100 # Rotate after size
max_total_size_mb = 1000 max_total_size_mb = 1000 # Total size limit for all logs
retention_hours = 168.0 # 7 days retention_hours = 168.0 # Delete logs older than (0 = disabled)
# Console output settings
[logging.console] [logging.console]
target = "stderr" # stdout, stderr, split target = "stderr" # stdout, stderr, split (split: info→stdout, error→stderr)
format = "txt" # txt, json format = "txt" # txt, json
# ============================================================================ # ============================================================================
# PIPELINE CONFIGURATION # PIPELINES
# ============================================================================ # ============================================================================
# Each [[pipelines]] defines an independent log processing pipeline # Define one or more [[pipelines]] blocks
# Structure: sources → filters → sinks # Each pipeline: sources → [rate_limit] → [filters] → [format] → sinks
[[pipelines]] [[pipelines]]
# Unique pipeline identifier (used in router paths) name = "default" # (required) Unique identifier
name = "default"
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# SOURCES - Input data sources # PIPELINE RATE LIMITING (optional)
# ----------------------------------------------------------------------------
# [pipelines.rate_limit]
# rate = 1000.0 # Entries per second (0 = unlimited)
# burst = 1000.0 # Max burst size (defaults to rate)
# policy = "drop" # drop, pass
# max_entry_size_bytes = 0 # Max size per entry (0 = unlimited)
# ----------------------------------------------------------------------------
# SOURCES
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
[[pipelines.sources]] [[pipelines.sources]]
# Source type: directory, file, stdin type = "directory" # directory, file, stdin, http, tcp
type = "directory"
# Type-specific options # Directory source options
options = { [pipelines.sources.options]
path = "./", path = "./" # (required) Directory path
pattern = "*.log", pattern = "*.log" # Glob pattern
check_interval_ms = 100 # How often to check for new entries (10-60000) check_interval_ms = 100 # Scan interval (min: 10)
}
# Additional source examples: # File source options (alternative)
# [[pipelines.sources]]
# type = "file" # type = "file"
# options = { path = "/var/log/app.log" } # [pipelines.sources.options]
# # path = "/var/log/app.log" # (required) File path
# [[pipelines.sources]]
# type = "stdin" # HTTP source options (alternative)
# options = {} # type = "http"
# [pipelines.sources.options]
# port = 8081 # (required) Listen port
# ingest_path = "/ingest" # POST endpoint
# buffer_size = 1000 # Entry buffer size
# net_limit = { # Rate limiting
# enabled = true,
# requests_per_second = 100.0,
# burst_size = 200,
# limit_by = "ip" # ip, global
# }
# TCP source options (alternative)
# type = "tcp"
# [pipelines.sources.options]
# port = 9091 # (required) Listen port
# buffer_size = 1000 # Entry buffer size
# net_limit = { ... } # Same as HTTP
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# FILTERS - Log entry filtering (optional) # FILTERS (optional)
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Multiple filters are applied sequentially - all must pass
# [[pipelines.filters]] # [[pipelines.filters]]
# type = "include" # include (whitelist) or exclude (blacklist) # type = "include" # include (whitelist), exclude (blacklist)
# logic = "or" # or (match any) or and (match all) # logic = "or" # or (any match), and (all match)
# patterns = [ # patterns = [ # Regular expressions
# "ERROR", # "ERROR",
# "(?i)warn", # Case-insensitive # "(?i)warn", # Case-insensitive
# "\\bfatal\\b" # Word boundary # "\\bfatal\\b" # Word boundary
# ] # ]
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# SINKS - Output destinations # FORMAT (optional)
# ----------------------------------------------------------------------------
# format = "raw" # raw, json, text
# [pipelines.format_options]
# # JSON formatter options
# pretty = false # Pretty print JSON
# timestamp_field = "timestamp" # Field name for timestamp
# level_field = "level" # Field name for log level
# message_field = "message" # Field name for message
# source_field = "source" # Field name for source
#
# # Text formatter options
# template = "[{{.Timestamp | FmtTime}}] [{{.Level | ToUpper}}] {{.Source}} - {{.Message}}"
# timestamp_format = "2006-01-02T15:04:05Z07:00" # Go time format
# ----------------------------------------------------------------------------
# SINKS
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
[[pipelines.sinks]] [[pipelines.sinks]]
# Sink type: http, tcp, file, stdout, stderr type = "http" # http, tcp, http_client, tcp_client, file, stdout, stderr
type = "http"
# Type-specific options # HTTP sink options (streaming server)
options = { [pipelines.sinks.options]
port = 8080, port = 8080 # (required) Listen port
buffer_size = 1000, buffer_size = 1000 # Entry buffer size
stream_path = "/stream", stream_path = "/stream" # SSE endpoint
status_path = "/status", status_path = "/status" # Status endpoint
# Heartbeat configuration [pipelines.sinks.options.heartbeat]
heartbeat = { enabled = true # Send periodic heartbeats
enabled = true, interval_seconds = 30 # Heartbeat interval
interval_seconds = 30, format = "comment" # comment, json
format = "comment", # comment or json include_timestamp = true # Include timestamp in heartbeat
include_timestamp = true, include_stats = false # Include statistics
include_stats = false
},
# Rate limiting (optional) [pipelines.sinks.options.net_limit]
rate_limit = { enabled = false # Enable rate limiting
enabled = false, requests_per_second = 10.0 # Request rate limit
requests_per_second = 10.0, burst_size = 20 # Token bucket burst
burst_size = 20, limit_by = "ip" # ip, global
limit_by = "ip", # ip or global max_connections_per_ip = 5 # Per-IP connection limit
max_connections_per_ip = 5, max_total_connections = 100 # Total connection limit
max_total_connections = 100, response_code = 429 # HTTP response code
response_code = 429, response_message = "Rate limit exceeded"
response_message = "Rate limit exceeded"
}
# SSL/TLS (planned) # TCP sink options (alternative)
# ssl = {
# enabled = false,
# cert_file = "/path/to/cert.pem",
# key_file = "/path/to/key.pem"
# }
}
# Additional sink examples:
# [[pipelines.sinks]]
# type = "tcp" # type = "tcp"
# options = { # [pipelines.sinks.options]
# port = 9090, # port = 9090 # (required) Listen port
# buffer_size = 5000, # buffer_size = 1000
# heartbeat = { enabled = true, interval_seconds = 60 } # heartbeat = { ... } # Same as HTTP
# net_limit = { ... } # Same as HTTP
# HTTP client sink options (forward to remote)
# type = "http_client"
# [pipelines.sinks.options]
# url = "https://logs.example.com/ingest" # (required) Target URL
# batch_size = 100 # Entries per batch
# batch_delay_ms = 1000 # Batch timeout
# timeout_seconds = 30 # Request timeout
# max_retries = 3 # Retry attempts
# retry_delay_ms = 1000 # Initial retry delay
# retry_backoff = 2.0 # Exponential backoff multiplier
# insecure_skip_verify = false # Skip TLS verification
# headers = { # Custom headers
# "Authorization" = "Bearer token",
# "X-Custom" = "value"
# } # }
# [[pipelines.sinks]] # TCP client sink options (forward to remote)
# type = "tcp_client"
# [pipelines.sinks.options]
# address = "logs.example.com:9090" # (required) host:port
# buffer_size = 1000
# dial_timeout_seconds = 10 # Connection timeout
# write_timeout_seconds = 30 # Write timeout
# keep_alive_seconds = 30 # TCP keepalive
# reconnect_delay_ms = 1000 # Initial reconnect delay
# max_reconnect_delay_seconds = 30 # Max reconnect delay
# reconnect_backoff = 1.5 # Exponential backoff
# File sink options
# type = "file" # type = "file"
# options = { # [pipelines.sinks.options]
# directory = "/var/log/logwisp", # directory = "/var/log/logwisp" # (required) Output directory
# name = "app", # name = "app" # (required) Base filename
# max_size_mb = 100, # max_size_mb = 100 # Rotate after size
# retention_hours = 168.0 # max_total_size_mb = 0 # Total size limit (0 = unlimited)
# } # retention_hours = 0.0 # Delete old files (0 = disabled)
# min_disk_free_mb = 1000 # Maintain free disk space
# [[pipelines.sinks]] # Console sink options
# type = "stdout" # type = "stdout" # or "stderr"
# options = { buffer_size = 500 } # [pipelines.sinks.options]
# buffer_size = 1000
# target = "stdout" # Override for split mode
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# AUTHENTICATION (optional, applies to network sinks) # AUTHENTICATION (optional, for network sinks)
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# [pipelines.auth] # [pipelines.auth]
# type = "none" # none, basic, bearer # type = "none" # none, basic, bearer
# ip_whitelist = [] # Allowed IPs (empty = all)
# ip_blacklist = [] # Blocked IPs
# #
# [pipelines.auth.basic_auth] # [pipelines.auth.basic_auth]
# realm = "LogWisp" # realm = "LogWisp" # WWW-Authenticate realm
# users = [ # users_file = "" # External users file
# { username = "admin", password_hash = "$2a$10$..." } # [[pipelines.auth.basic_auth.users]]
# ] # username = "admin"
# ip_whitelist = ["192.168.1.0/24"] # password_hash = "$2a$10$..." # bcrypt hash
# ============================================================================
# COMPLETE EXAMPLES
# ============================================================================
# Example: Production logs with filtering and multiple outputs
# [[pipelines]]
# name = "production"
# #
# [[pipelines.sources]] # [pipelines.auth.bearer_auth]
# type = "directory" # tokens = ["token1", "token2"] # Static tokens
# options = { path = "/var/log/app", pattern = "*.log", check_interval_ms = 50 } # [pipelines.auth.bearer_auth.jwt]
# # jwks_url = "" # JWKS endpoint
# [[pipelines.filters]] # signing_key = "" # Static key (if not using JWKS)
# type = "include" # issuer = "" # Expected issuer
# patterns = ["ERROR", "WARN", "CRITICAL"] # audience = "" # Expected audience
#
# [[pipelines.filters]]
# type = "exclude"
# patterns = ["/health", "/metrics"]
#
# [[pipelines.sinks]]
# type = "http"
# options = {
# port = 8080,
# rate_limit = { enabled = true, requests_per_second = 25.0 }
# }
#
# [[pipelines.sinks]]
# type = "file"
# options = { directory = "/var/log/archive", name = "errors" }
# Example: Multi-source aggregation
# [[pipelines]]
# name = "aggregated"
#
# [[pipelines.sources]]
# type = "directory"
# options = { path = "/var/log/nginx", pattern = "*.log" }
#
# [[pipelines.sources]]
# type = "directory"
# options = { path = "/var/log/app", pattern = "*.log" }
#
# [[pipelines.sinks]]
# type = "tcp"
# options = { port = 9090 }
# ============================================================================ # ============================================================================
# ROUTER MODE # ROUTER MODE
# ============================================================================ # ============================================================================
# Run with: logwisp --router # Enable with: logwisp --router or router = true
# Allows multiple pipelines to share HTTP ports via path-based routing # Combines multiple pipeline HTTP sinks on shared ports
# Access: http://localhost:8080/{pipeline_name}/stream # Access pattern: http://localhost:8080/{pipeline_name}/stream
# Global status: http://localhost:8080/status # Global status: http://localhost:8080/status
# ============================================================================ # ============================================================================
# QUICK REFERENCE # CLI FLAGS
# ============================================================================ # ============================================================================
# Source types: directory, file, stdin # --config, -c PATH # Config file path
# Sink types: http, tcp, file, stdout, stderr # --router, -r # Enable router mode
# Filter types: include, exclude # --background, -b # Run as daemon
# Filter logic: or, and # --quiet, -q # Suppress output
# # --version, -v # Show version
# Common patterns:
# "(?i)error" - Case-insensitive # ============================================================================
# "\\berror\\b" - Word boundary # ENVIRONMENT VARIABLES
# "^ERROR" - Start of line # ============================================================================
# "status=[4-5]\\d{2}" - HTTP errors # LOGWISP_CONFIG_FILE # Config filename
# LOGWISP_CONFIG_DIR # Config directory
# LOGWISP_CONSOLE_TARGET # Override console target
# Any config value: LOGWISP_<SECTION>_<KEY> (uppercase, dots → underscores)

View File

@ -1,5 +1,5 @@
# LogWisp Minimal Configuration # LogWisp Minimal Configuration
# Save as: ~/.config/logwisp.toml # Save as: ~/.config/logwisp/logwisp.toml
# Basic pipeline monitoring application logs # Basic pipeline monitoring application logs
[[pipelines]] [[pipelines]]
@ -20,27 +20,23 @@ options = {
status_path = "/status" status_path = "/status"
} }
# Optional additions: # Optional: Filter for errors only
# 1. Filter for errors only:
# [[pipelines.filters]] # [[pipelines.filters]]
# type = "include" # type = "include"
# patterns = ["ERROR", "WARN", "CRITICAL", "FATAL"] # patterns = ["ERROR", "WARN", "CRITICAL"]
# 2. Enable rate limiting: # Optional: Add rate limiting to HTTP sink
# Modify the sink options above: # [[pipelines.sinks]]
# type = "http"
# options = { # options = {
# port = 8080, # port = 8080,
# buffer_size = 1000, # buffer_size = 1000,
# rate_limit = { enabled = true, requests_per_second = 10.0, burst_size = 20 } # stream_path = "/stream",
# status_path = "/status",
# net_limit = { enabled = true, requests_per_second = 10.0, burst_size = 20 }
# } # }
# 3. Add file output: # Optional: Add file output
# [[pipelines.sinks]] # [[pipelines.sinks]]
# type = "file" # type = "file"
# options = { directory = "/var/log/logwisp", name = "app" } # options = { directory = "/var/log/logwisp", name = "app" }
# 4. Change LogWisp's own logging:
# [logging]
# output = "file"
# level = "info"

6
go.mod
View File

@ -3,10 +3,10 @@ module logwisp
go 1.24.5 go 1.24.5
require ( require (
github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 github.com/lixenwraith/config v0.0.0-20250719015120-e02ee494d440
github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac github.com/lixenwraith/log v0.0.0-20250719031926-25f1c8eb54fa
github.com/panjf2000/gnet/v2 v2.9.1 github.com/panjf2000/gnet/v2 v2.9.1
github.com/valyala/fasthttp v1.63.0 github.com/valyala/fasthttp v1.64.0
) )
require ( require (

16
go.sum
View File

@ -6,14 +6,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 h1:ixTIdJSd945n/IhMRwGwQVmQnQ1nUr5z1wn31jXq9FU= github.com/lixenwraith/config v0.0.0-20250719015120-e02ee494d440 h1:O6nHnpeDfIYQ1WxCtA2gkm8upQ4RW21DUMlQz5DKJCU=
github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497/go.mod h1:y7kgDrWIFROWJJ6ASM/SPTRRAj27FjRGWh2SDLcdQ68= github.com/lixenwraith/config v0.0.0-20250719015120-e02ee494d440/go.mod h1:y7kgDrWIFROWJJ6ASM/SPTRRAj27FjRGWh2SDLcdQ68=
github.com/lixenwraith/log v0.0.0-20250713210809-0ac292ae5dc1 h1:kcZRASUvPdqnvgMxqxx/FZCWzCwz4bA7ArT8L3djZtk= github.com/lixenwraith/log v0.0.0-20250719031926-25f1c8eb54fa h1:Y2AYESKfvDVR1JxRU5aijrGPbbY/cDA28iUDDoSqb2M=
github.com/lixenwraith/log v0.0.0-20250713210809-0ac292ae5dc1/go.mod h1:BLWEFFryXtvvdUQkD+atik4uTyukO7gRubWpSNdW210= github.com/lixenwraith/log v0.0.0-20250719031926-25f1c8eb54fa/go.mod h1:PkY5HFyCZZs2NSeACKbl26ibKqQ6bYyEitMjowgHe/s=
github.com/lixenwraith/log v0.0.0-20250714221910-15e54d455464 h1:94riru1LpECWoIca4mnVW/9O1a9jUOB2HaeMSbKmDJQ=
github.com/lixenwraith/log v0.0.0-20250714221910-15e54d455464/go.mod h1:egVvySkgFmQXAlekEpeBqGVmopd09tP6BZB58JQJEfM=
github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac h1:PfbHbKeCHQnzRlSOLhzd5OJofx2EJKzZX7yc0/xuw3w=
github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac/go.mod h1:egVvySkgFmQXAlekEpeBqGVmopd09tP6BZB58JQJEfM=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg=
@ -26,8 +22,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.63.0 h1:DisIL8OjB7ul2d7cBaMRcKTQDYnrGy56R4FCiuDP0Ns= github.com/valyala/fasthttp v1.64.0 h1:QBygLLQmiAyiXuRhthf0tuRkqAFcrC42dckN2S+N3og=
github.com/valyala/fasthttp v1.63.0/go.mod h1:REc4IeW+cAEyLrRPa5A81MIjvz0QE1laoTX2EaPHKJM= github.com/valyala/fasthttp v1.64.0/go.mod h1:dGmFxwkWXSK0NbOSJuF7AMVzU+lkHz0wQVvVITv2UQA=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

View File

@ -68,17 +68,14 @@ func bootstrapService(ctx context.Context, cfg *config.Config) (*service.Service
// initializeLogger sets up the logger based on configuration // initializeLogger sets up the logger based on configuration
func initializeLogger(cfg *config.Config) error { func initializeLogger(cfg *config.Config) error {
logger = log.NewLogger() logger = log.NewLogger()
logCfg := log.DefaultConfig()
var configArgs []string
if cfg.Quiet { if cfg.Quiet {
// In quiet mode, disable ALL logging output // In quiet mode, disable ALL logging output
configArgs = append(configArgs, logCfg.Level = 255 // A level that disables all output
"disable_file=true", logCfg.DisableFile = true
"enable_stdout=false", logCfg.EnableStdout = false
"level=255") return logger.ApplyConfig(logCfg)
return logger.InitWithDefaults(configArgs...)
} }
// Determine log level // Determine log level
@ -86,89 +83,75 @@ func initializeLogger(cfg *config.Config) error {
if err != nil { if err != nil {
return fmt.Errorf("invalid log level: %w", err) return fmt.Errorf("invalid log level: %w", err)
} }
configArgs = append(configArgs, fmt.Sprintf("level=%d", levelValue)) logCfg.Level = levelValue
// Configure based on output mode // Configure based on output mode
switch cfg.Logging.Output { switch cfg.Logging.Output {
case "none": case "none":
configArgs = append(configArgs, "disable_file=true", "enable_stdout=false") logCfg.DisableFile = true
logCfg.EnableStdout = false
case "stdout": case "stdout":
configArgs = append(configArgs, logCfg.DisableFile = true
"disable_file=true", logCfg.EnableStdout = true
"enable_stdout=true", logCfg.StdoutTarget = "stdout"
"stdout_target=stdout")
case "stderr": case "stderr":
configArgs = append(configArgs, logCfg.DisableFile = true
"disable_file=true", logCfg.EnableStdout = true
"enable_stdout=true", logCfg.StdoutTarget = "stderr"
"stdout_target=stderr")
case "file": case "file":
configArgs = append(configArgs, "enable_stdout=false") logCfg.EnableStdout = false
configureFileLogging(&configArgs, cfg) configureFileLogging(logCfg, cfg)
case "both": case "both":
configArgs = append(configArgs, "enable_stdout=true") logCfg.EnableStdout = true
configureFileLogging(&configArgs, cfg) configureFileLogging(logCfg, cfg)
configureConsoleTarget(&configArgs, cfg) configureConsoleTarget(logCfg, cfg)
default: default:
return fmt.Errorf("invalid log output mode: %s", cfg.Logging.Output) return fmt.Errorf("invalid log output mode: %s", cfg.Logging.Output)
} }
// Apply format if specified // Apply format if specified
if cfg.Logging.Console != nil && cfg.Logging.Console.Format != "" { if cfg.Logging.Console != nil && cfg.Logging.Console.Format != "" {
configArgs = append(configArgs, fmt.Sprintf("format=%s", cfg.Logging.Console.Format)) logCfg.Format = cfg.Logging.Console.Format
} }
return logger.InitWithDefaults(configArgs...) return logger.ApplyConfig(logCfg)
} }
// configureFileLogging sets up file-based logging parameters // configureFileLogging sets up file-based logging parameters
func configureFileLogging(configArgs *[]string, cfg *config.Config) { func configureFileLogging(logCfg *log.Config, cfg *config.Config) {
if cfg.Logging.File != nil { if cfg.Logging.File != nil {
*configArgs = append(*configArgs, logCfg.Directory = cfg.Logging.File.Directory
fmt.Sprintf("directory=%s", cfg.Logging.File.Directory), logCfg.Name = cfg.Logging.File.Name
fmt.Sprintf("name=%s", cfg.Logging.File.Name), logCfg.MaxSizeMB = cfg.Logging.File.MaxSizeMB
fmt.Sprintf("max_size_mb=%d", cfg.Logging.File.MaxSizeMB), logCfg.MaxTotalSizeMB = cfg.Logging.File.MaxTotalSizeMB
fmt.Sprintf("max_total_size_mb=%d", cfg.Logging.File.MaxTotalSizeMB))
if cfg.Logging.File.RetentionHours > 0 { if cfg.Logging.File.RetentionHours > 0 {
*configArgs = append(*configArgs, logCfg.RetentionPeriodHrs = cfg.Logging.File.RetentionHours
fmt.Sprintf("retention_period_hrs=%.1f", cfg.Logging.File.RetentionHours))
} }
} }
} }
// configureConsoleTarget sets up console output parameters // configureConsoleTarget sets up console output parameters
func configureConsoleTarget(configArgs *[]string, cfg *config.Config) { func configureConsoleTarget(logCfg *log.Config, cfg *config.Config) {
target := "stderr" // default target := "stderr" // default
if cfg.Logging.Console != nil && cfg.Logging.Console.Target != "" { if cfg.Logging.Console != nil && cfg.Logging.Console.Target != "" {
target = cfg.Logging.Console.Target target = cfg.Logging.Console.Target
} }
// Split mode by configuring log package with level-based routing // Set the target, which can be "stdout", "stderr", or "split"
if target == "split" { logCfg.StdoutTarget = target
*configArgs = append(*configArgs, "stdout_split_mode=true")
*configArgs = append(*configArgs, "stdout_target=split")
} else {
*configArgs = append(*configArgs, fmt.Sprintf("stdout_target=%s", target))
}
} }
func parseLogLevel(level string) (int, error) { func parseLogLevel(level string) (int64, error) {
switch strings.ToLower(level) { switch strings.ToLower(level) {
case "debug": case "debug":
return int(log.LevelDebug), nil return log.LevelDebug, nil
case "info": case "info":
return int(log.LevelInfo), nil return log.LevelInfo, nil
case "warn", "warning": case "warn", "warning":
return int(log.LevelWarn), nil return log.LevelWarn, nil
case "error": case "error":
return int(log.LevelError), nil return log.LevelError, nil
default: default:
return 0, fmt.Errorf("unknown log level: %s", level) return 0, fmt.Errorf("unknown log level: %s", level)
} }

View File

@ -79,17 +79,17 @@ func logPipelineStatus(name string, stats map[string]any) {
// Add sink statistics // Add sink statistics
if sinks, ok := stats["sinks"].([]map[string]any); ok { if sinks, ok := stats["sinks"].([]map[string]any); ok {
tcpConns := 0 tcpConns := int64(0)
httpConns := 0 httpConns := int64(0)
for _, sink := range sinks { for _, sink := range sinks {
sinkType := sink["type"].(string) sinkType := sink["type"].(string)
if activeConns, ok := sink["active_connections"].(int32); ok { if activeConns, ok := sink["active_connections"].(int64); ok {
switch sinkType { switch sinkType {
case "tcp": case "tcp":
tcpConns += int(activeConns) tcpConns += activeConns
case "http": case "http":
httpConns += int(activeConns) httpConns += activeConns
} }
} }
} }
@ -111,7 +111,7 @@ func displayPipelineEndpoints(cfg config.PipelineConfig, routerMode bool) {
for i, sinkCfg := range cfg.Sinks { for i, sinkCfg := range cfg.Sinks {
switch sinkCfg.Type { switch sinkCfg.Type {
case "tcp": case "tcp":
if port, ok := toInt(sinkCfg.Options["port"]); ok { if port, ok := sinkCfg.Options["port"].(int64); ok {
logger.Info("msg", "TCP endpoint configured", logger.Info("msg", "TCP endpoint configured",
"component", "main", "component", "main",
"pipeline", cfg.Name, "pipeline", cfg.Name,
@ -131,7 +131,7 @@ func displayPipelineEndpoints(cfg config.PipelineConfig, routerMode bool) {
} }
case "http": case "http":
if port, ok := toInt(sinkCfg.Options["port"]); ok { if port, ok := sinkCfg.Options["port"].(int64); ok {
streamPath := "/transport" streamPath := "/transport"
statusPath := "/status" statusPath := "/status"
if path, ok := sinkCfg.Options["stream_path"].(string); ok { if path, ok := sinkCfg.Options["stream_path"].(string); ok {
@ -200,17 +200,3 @@ func displayPipelineEndpoints(cfg config.PipelineConfig, routerMode bool) {
"filter_count", len(cfg.Filters)) "filter_count", len(cfg.Filters))
} }
} }
// Helper function for type conversion
func toInt(v any) (int, bool) {
switch val := v.(type) {
case int:
return val, true
case int64:
return int(val), true
case float64:
return int(val), true
default:
return 0, false
}
}

View File

@ -21,30 +21,3 @@ type Config struct {
Logging *LogConfig `toml:"logging"` Logging *LogConfig `toml:"logging"`
Pipelines []PipelineConfig `toml:"pipelines"` Pipelines []PipelineConfig `toml:"pipelines"`
} }
// Helper functions to handle type conversions from any
func toInt(v any) (int, bool) {
switch val := v.(type) {
case int:
return val, true
case int64:
return int(val), true
case float64:
return int(val), true
default:
return 0, false
}
}
func toFloat(v any) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int:
return float64(val), true
case int64:
return float64(val), true
default:
return 0, false
}
}

View File

@ -40,7 +40,7 @@ func defaults() *Config {
Options: map[string]any{ Options: map[string]any{
"path": "./", "path": "./",
"pattern": "*.log", "pattern": "*.log",
"check_interval_ms": 100, "check_interval_ms": int64(100),
}, },
}, },
}, },
@ -48,13 +48,13 @@ func defaults() *Config {
{ {
Type: "http", Type: "http",
Options: map[string]any{ Options: map[string]any{
"port": 8080, "port": int64(8080),
"buffer_size": 1000, "buffer_size": int64(1000),
"stream_path": "/stream", "stream_path": "/stream",
"status_path": "/status", "status_path": "/status",
"heartbeat": map[string]any{ "heartbeat": map[string]any{
"enabled": true, "enabled": true,
"interval_seconds": 30, "interval_seconds": int64(30),
"include_timestamp": true, "include_timestamp": true,
"include_stats": false, "include_stats": false,
"format": "comment", "format": "comment",

View File

@ -86,7 +86,7 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
// Validate check interval if provided // Validate check interval if provided
if interval, ok := cfg.Options["check_interval_ms"]; ok { if interval, ok := cfg.Options["check_interval_ms"]; ok {
if intVal, ok := toInt(interval); ok { if intVal, ok := interval.(int64); ok {
if intVal < 10 { if intVal < 10 {
return fmt.Errorf("pipeline '%s' source[%d]: check interval too small: %d ms (min: 10ms)", return fmt.Errorf("pipeline '%s' source[%d]: check interval too small: %d ms (min: 10ms)",
pipelineName, sourceIndex, intVal) pipelineName, sourceIndex, intVal)
@ -102,7 +102,7 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
case "http": case "http":
// Validate HTTP source options // Validate HTTP source options
port, ok := toInt(cfg.Options["port"]) port, ok := cfg.Options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing HTTP port", return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing HTTP port",
pipelineName, sourceIndex) pipelineName, sourceIndex)
@ -125,7 +125,7 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
case "tcp": case "tcp":
// Validate TCP source options // Validate TCP source options
port, ok := toInt(cfg.Options["port"]) port, ok := cfg.Options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing TCP port", return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing TCP port",
pipelineName, sourceIndex) pipelineName, sourceIndex)
@ -146,7 +146,7 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
return nil return nil
} }
func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts map[int]string) error { func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts map[int64]string) error {
if cfg.Type == "" { if cfg.Type == "" {
return fmt.Errorf("pipeline '%s' sink[%d]: missing type", pipelineName, sinkIndex) return fmt.Errorf("pipeline '%s' sink[%d]: missing type", pipelineName, sinkIndex)
} }
@ -154,7 +154,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
switch cfg.Type { switch cfg.Type {
case "http": case "http":
// Extract and validate HTTP configuration // Extract and validate HTTP configuration
port, ok := toInt(cfg.Options["port"]) port, ok := cfg.Options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return fmt.Errorf("pipeline '%s' sink[%d]: invalid or missing HTTP port", return fmt.Errorf("pipeline '%s' sink[%d]: invalid or missing HTTP port",
pipelineName, sinkIndex) pipelineName, sinkIndex)
@ -168,7 +168,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
allPorts[port] = fmt.Sprintf("%s-http[%d]", pipelineName, sinkIndex) allPorts[port] = fmt.Sprintf("%s-http[%d]", pipelineName, sinkIndex)
// Validate buffer size // Validate buffer size
if bufSize, ok := toInt(cfg.Options["buffer_size"]); ok { if bufSize, ok := cfg.Options["buffer_size"].(int64); ok {
if bufSize < 1 { if bufSize < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: HTTP buffer size must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: HTTP buffer size must be positive: %d",
pipelineName, sinkIndex, bufSize) pipelineName, sinkIndex, bufSize)
@ -213,7 +213,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
case "tcp": case "tcp":
// Extract and validate TCP configuration // Extract and validate TCP configuration
port, ok := toInt(cfg.Options["port"]) port, ok := cfg.Options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return fmt.Errorf("pipeline '%s' sink[%d]: invalid or missing TCP port", return fmt.Errorf("pipeline '%s' sink[%d]: invalid or missing TCP port",
pipelineName, sinkIndex) pipelineName, sinkIndex)
@ -227,7 +227,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
allPorts[port] = fmt.Sprintf("%s-tcp[%d]", pipelineName, sinkIndex) allPorts[port] = fmt.Sprintf("%s-tcp[%d]", pipelineName, sinkIndex)
// Validate buffer size // Validate buffer size
if bufSize, ok := toInt(cfg.Options["buffer_size"]); ok { if bufSize, ok := cfg.Options["buffer_size"].(int64); ok {
if bufSize < 1 { if bufSize < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: TCP buffer size must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: TCP buffer size must be positive: %d",
pipelineName, sinkIndex, bufSize) pipelineName, sinkIndex, bufSize)
@ -275,7 +275,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
} }
// Validate batch size // Validate batch size
if batchSize, ok := toInt(cfg.Options["batch_size"]); ok { if batchSize, ok := cfg.Options["batch_size"].(int64); ok {
if batchSize < 1 { if batchSize < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: batch_size must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: batch_size must be positive: %d",
pipelineName, sinkIndex, batchSize) pipelineName, sinkIndex, batchSize)
@ -283,7 +283,7 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
} }
// Validate timeout // Validate timeout
if timeout, ok := toInt(cfg.Options["timeout_seconds"]); ok { if timeout, ok := cfg.Options["timeout_seconds"].(int64); ok {
if timeout < 1 { if timeout < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: timeout_seconds must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: timeout_seconds must be positive: %d",
pipelineName, sinkIndex, timeout) pipelineName, sinkIndex, timeout)
@ -307,14 +307,14 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
} }
// Validate timeouts // Validate timeouts
if dialTimeout, ok := toInt(cfg.Options["dial_timeout_seconds"]); ok { if dialTimeout, ok := cfg.Options["dial_timeout_seconds"].(int64); ok {
if dialTimeout < 1 { if dialTimeout < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: dial_timeout_seconds must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: dial_timeout_seconds must be positive: %d",
pipelineName, sinkIndex, dialTimeout) pipelineName, sinkIndex, dialTimeout)
} }
} }
if writeTimeout, ok := toInt(cfg.Options["write_timeout_seconds"]); ok { if writeTimeout, ok := cfg.Options["write_timeout_seconds"].(int64); ok {
if writeTimeout < 1 { if writeTimeout < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: write_timeout_seconds must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: write_timeout_seconds must be positive: %d",
pipelineName, sinkIndex, writeTimeout) pipelineName, sinkIndex, writeTimeout)
@ -336,21 +336,21 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
} }
// Validate numeric options // Validate numeric options
if maxSize, ok := toInt(cfg.Options["max_size_mb"]); ok { if maxSize, ok := cfg.Options["max_size_mb"].(int64); ok {
if maxSize < 1 { if maxSize < 1 {
return fmt.Errorf("pipeline '%s' sink[%d]: max_size_mb must be positive: %d", return fmt.Errorf("pipeline '%s' sink[%d]: max_size_mb must be positive: %d",
pipelineName, sinkIndex, maxSize) pipelineName, sinkIndex, maxSize)
} }
} }
if maxTotalSize, ok := toInt(cfg.Options["max_total_size_mb"]); ok { if maxTotalSize, ok := cfg.Options["max_total_size_mb"].(int64); ok {
if maxTotalSize < 0 { if maxTotalSize < 0 {
return fmt.Errorf("pipeline '%s' sink[%d]: max_total_size_mb cannot be negative: %d", return fmt.Errorf("pipeline '%s' sink[%d]: max_total_size_mb cannot be negative: %d",
pipelineName, sinkIndex, maxTotalSize) pipelineName, sinkIndex, maxTotalSize)
} }
} }
if retention, ok := toFloat(cfg.Options["retention_hours"]); ok { if retention, ok := cfg.Options["retention_hours"].(float64); ok {
if retention < 0 { if retention < 0 {
return fmt.Errorf("pipeline '%s' sink[%d]: retention_hours cannot be negative: %f", return fmt.Errorf("pipeline '%s' sink[%d]: retention_hours cannot be negative: %f",
pipelineName, sinkIndex, retention) pipelineName, sinkIndex, retention)

View File

@ -25,7 +25,7 @@ type RateLimitConfig struct {
// Policy defines the action to take when the limit is exceeded. "pass" or "drop". // Policy defines the action to take when the limit is exceeded. "pass" or "drop".
Policy string `toml:"policy"` Policy string `toml:"policy"`
// MaxEntrySizeBytes is the maximum allowed size for a single log entry. 0 = no limit. // MaxEntrySizeBytes is the maximum allowed size for a single log entry. 0 = no limit.
MaxEntrySizeBytes int `toml:"max_entry_size_bytes"` MaxEntrySizeBytes int64 `toml:"max_entry_size_bytes"`
} }
func validateRateLimit(pipelineName string, cfg *RateLimitConfig) error { func validateRateLimit(pipelineName string, cfg *RateLimitConfig) error {

View File

@ -5,8 +5,8 @@ import "fmt"
type TCPConfig struct { type TCPConfig struct {
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
Port int `toml:"port"` Port int64 `toml:"port"`
BufferSize int `toml:"buffer_size"` BufferSize int64 `toml:"buffer_size"`
// SSL/TLS Configuration // SSL/TLS Configuration
SSL *SSLConfig `toml:"ssl"` SSL *SSLConfig `toml:"ssl"`
@ -20,8 +20,8 @@ type TCPConfig struct {
type HTTPConfig struct { type HTTPConfig struct {
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
Port int `toml:"port"` Port int64 `toml:"port"`
BufferSize int `toml:"buffer_size"` BufferSize int64 `toml:"buffer_size"`
// Endpoint paths // Endpoint paths
StreamPath string `toml:"stream_path"` StreamPath string `toml:"stream_path"`
@ -39,10 +39,10 @@ type HTTPConfig struct {
type HeartbeatConfig struct { type HeartbeatConfig struct {
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
IntervalSeconds int `toml:"interval_seconds"` IntervalSeconds int64 `toml:"interval_seconds"`
IncludeTimestamp bool `toml:"include_timestamp"` IncludeTimestamp bool `toml:"include_timestamp"`
IncludeStats bool `toml:"include_stats"` IncludeStats bool `toml:"include_stats"`
Format string `toml:"format"` // "comment" or "json" Format string `toml:"format"`
} }
type NetLimitConfig struct { type NetLimitConfig struct {
@ -53,23 +53,23 @@ type NetLimitConfig struct {
RequestsPerSecond float64 `toml:"requests_per_second"` RequestsPerSecond float64 `toml:"requests_per_second"`
// Burst size (token bucket) // Burst size (token bucket)
BurstSize int `toml:"burst_size"` BurstSize int64 `toml:"burst_size"`
// Net limit by: "ip", "user", "token", "global" // Net limit by: "ip", "user", "token", "global"
LimitBy string `toml:"limit_by"` LimitBy string `toml:"limit_by"`
// Response when net limited // Response when net limited
ResponseCode int `toml:"response_code"` // Default: 429 ResponseCode int64 `toml:"response_code"` // Default: 429
ResponseMessage string `toml:"response_message"` // Default: "Net limit exceeded" ResponseMessage string `toml:"response_message"` // Default: "Net limit exceeded"
// Connection limits // Connection limits
MaxConnectionsPerIP int `toml:"max_connections_per_ip"` MaxConnectionsPerIP int64 `toml:"max_connections_per_ip"`
MaxTotalConnections int `toml:"max_total_connections"` MaxTotalConnections int64 `toml:"max_total_connections"`
} }
func validateHeartbeatOptions(serverType, pipelineName string, sinkIndex int, hb map[string]any) error { func validateHeartbeatOptions(serverType, pipelineName string, sinkIndex int, hb map[string]any) error {
if enabled, ok := hb["enabled"].(bool); ok && enabled { if enabled, ok := hb["enabled"].(bool); ok && enabled {
interval, ok := toInt(hb["interval_seconds"]) interval, ok := hb["interval_seconds"].(int64)
if !ok || interval < 1 { if !ok || interval < 1 {
return fmt.Errorf("pipeline '%s' sink[%d] %s: heartbeat interval must be positive", return fmt.Errorf("pipeline '%s' sink[%d] %s: heartbeat interval must be positive",
pipelineName, sinkIndex, serverType) pipelineName, sinkIndex, serverType)
@ -91,14 +91,14 @@ func validateNetLimitOptions(serverType, pipelineName string, sinkIndex int, rl
} }
// Validate requests per second // Validate requests per second
rps, ok := toFloat(rl["requests_per_second"]) rps, ok := rl["requests_per_second"].(float64)
if !ok || rps <= 0 { if !ok || rps <= 0 {
return fmt.Errorf("pipeline '%s' sink[%d] %s: requests_per_second must be positive", return fmt.Errorf("pipeline '%s' sink[%d] %s: requests_per_second must be positive",
pipelineName, sinkIndex, serverType) pipelineName, sinkIndex, serverType)
} }
// Validate burst size // Validate burst size
burst, ok := toInt(rl["burst_size"]) burst, ok := rl["burst_size"].(int64)
if !ok || burst < 1 { if !ok || burst < 1 {
return fmt.Errorf("pipeline '%s' sink[%d] %s: burst_size must be at least 1", return fmt.Errorf("pipeline '%s' sink[%d] %s: burst_size must be at least 1",
pipelineName, sinkIndex, serverType) pipelineName, sinkIndex, serverType)
@ -114,7 +114,7 @@ func validateNetLimitOptions(serverType, pipelineName string, sinkIndex int, rl
} }
// Validate response code // Validate response code
if respCode, ok := toInt(rl["response_code"]); ok { if respCode, ok := rl["response_code"].(int64); ok {
if respCode > 0 && (respCode < 400 || respCode >= 600) { if respCode > 0 && (respCode < 400 || respCode >= 600) {
return fmt.Errorf("pipeline '%s' sink[%d] %s: response_code must be 4xx or 5xx: %d", return fmt.Errorf("pipeline '%s' sink[%d] %s: response_code must be 4xx or 5xx: %d",
pipelineName, sinkIndex, serverType, respCode) pipelineName, sinkIndex, serverType, respCode)
@ -122,8 +122,8 @@ func validateNetLimitOptions(serverType, pipelineName string, sinkIndex int, rl
} }
// Validate connection limits // Validate connection limits
maxPerIP, perIPOk := toInt(rl["max_connections_per_ip"]) maxPerIP, perIPOk := rl["max_connections_per_ip"].(int64)
maxTotal, totalOk := toInt(rl["max_total_connections"]) maxTotal, totalOk := rl["max_total_connections"].(int64)
if perIPOk && totalOk && maxPerIP > 0 && maxTotal > 0 { if perIPOk && totalOk && maxPerIP > 0 && maxTotal > 0 {
if maxPerIP > maxTotal { if maxPerIP > maxTotal {

View File

@ -23,7 +23,7 @@ func (c *Config) validate() error {
} }
// Track used ports across all pipelines // Track used ports across all pipelines
allPorts := make(map[int]string) allPorts := make(map[int64]string)
pipelineNames := make(map[string]bool) pipelineNames := make(map[string]bool)
for i, pipeline := range c.Pipelines { for i, pipeline := range c.Pipelines {

View File

@ -28,7 +28,7 @@ type Limiter struct {
globalLimiter *limiter.TokenBucket globalLimiter *limiter.TokenBucket
// Connection tracking // Connection tracking
ipConnections map[string]*atomic.Int32 ipConnections map[string]*atomic.Int64
connMu sync.RWMutex connMu sync.RWMutex
// Statistics // Statistics
@ -49,7 +49,7 @@ type Limiter struct {
type ipLimiter struct { type ipLimiter struct {
bucket *limiter.TokenBucket bucket *limiter.TokenBucket
lastSeen time.Time lastSeen time.Time
connections atomic.Int32 connections atomic.Int64
} }
// Creates a new net limiter // Creates a new net limiter
@ -67,7 +67,7 @@ func New(cfg config.NetLimitConfig, logger *log.Logger) *Limiter {
l := &Limiter{ l := &Limiter{
config: cfg, config: cfg,
ipLimiters: make(map[string]*ipLimiter), ipLimiters: make(map[string]*ipLimiter),
ipConnections: make(map[string]*atomic.Int32), ipConnections: make(map[string]*atomic.Int64),
lastCleanup: time.Now(), lastCleanup: time.Now(),
logger: logger, logger: logger,
ctx: ctx, ctx: ctx,
@ -115,7 +115,7 @@ func (l *Limiter) Shutdown() {
} }
// Checks if an HTTP request should be allowed // Checks if an HTTP request should be allowed
func (l *Limiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int, message string) { func (l *Limiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int64, message string) {
if l == nil { if l == nil {
return true, 0, "" return true, 0, ""
} }
@ -148,7 +148,7 @@ func (l *Limiter) CheckHTTP(remoteAddr string) (allowed bool, statusCode int, me
counter, exists := l.ipConnections[ip] counter, exists := l.ipConnections[ip]
l.connMu.RUnlock() l.connMu.RUnlock()
if exists && counter.Load() >= int32(l.config.MaxConnectionsPerIP) { if exists && counter.Load() >= l.config.MaxConnectionsPerIP {
l.blockedRequests.Add(1) l.blockedRequests.Add(1)
statusCode = l.config.ResponseCode statusCode = l.config.ResponseCode
if statusCode == 0 { if statusCode == 0 {
@ -242,7 +242,7 @@ func (l *Limiter) AddConnection(remoteAddr string) {
l.connMu.Lock() l.connMu.Lock()
counter, exists := l.ipConnections[ip] counter, exists := l.ipConnections[ip]
if !exists { if !exists {
counter = &atomic.Int32{} counter = &atomic.Int64{}
l.ipConnections[ip] = counter l.ipConnections[ip] = counter
} }
l.connMu.Unlock() l.connMu.Unlock()
@ -362,7 +362,7 @@ func (l *Limiter) checkLimit(ip string) bool {
counter, exists := l.ipConnections[ip] counter, exists := l.ipConnections[ip]
l.connMu.RUnlock() l.connMu.RUnlock()
if exists && counter.Load() >= int32(l.config.MaxConnectionsPerIP) { if exists && counter.Load() >= l.config.MaxConnectionsPerIP {
return false return false
} }
} }

View File

@ -19,7 +19,7 @@ type Limiter struct {
logger *log.Logger logger *log.Logger
// Statistics // Statistics
maxEntrySizeBytes int maxEntrySizeBytes int64
droppedBySizeCount atomic.Uint64 droppedBySizeCount atomic.Uint64
droppedCount atomic.Uint64 droppedCount atomic.Uint64
} }

View File

@ -17,7 +17,7 @@ import (
// HTTPRouter manages HTTP routing for multiple pipelines // HTTPRouter manages HTTP routing for multiple pipelines
type HTTPRouter struct { type HTTPRouter struct {
service *Service service *Service
servers map[int]*routerServer // port -> server servers map[int64]*routerServer // port -> server
mu sync.RWMutex mu sync.RWMutex
logger *log.Logger logger *log.Logger
@ -32,7 +32,7 @@ type HTTPRouter struct {
func NewHTTPRouter(service *Service, logger *log.Logger) *HTTPRouter { func NewHTTPRouter(service *Service, logger *log.Logger) *HTTPRouter {
return &HTTPRouter{ return &HTTPRouter{
service: service, service: service,
servers: make(map[int]*routerServer), servers: make(map[int64]*routerServer),
startTime: time.Now(), startTime: time.Now(),
logger: logger, logger: logger,
} }
@ -54,7 +54,7 @@ func (r *HTTPRouter) registerHTTPSink(pipelineName string, httpSink *sink.HTTPSi
// Get port from sink configuration // Get port from sink configuration
stats := httpSink.GetStats() stats := httpSink.GetStats()
details := stats.Details details := stats.Details
port := details["port"].(int) port := details["port"].(int64)
r.mu.Lock() r.mu.Lock()
rs, exists := r.servers[port] rs, exists := r.servers[port]
@ -179,7 +179,7 @@ func (r *HTTPRouter) Shutdown() {
var wg sync.WaitGroup var wg sync.WaitGroup
for port, rs := range r.servers { for port, rs := range r.servers {
wg.Add(1) wg.Add(1)
go func(p int, s *routerServer) { go func(p int64, s *routerServer) {
defer wg.Done() defer wg.Done()
r.logger.Info("msg", "Shutting down server", r.logger.Info("msg", "Shutting down server",
"component", "http_router", "component", "http_router",
@ -202,7 +202,7 @@ func (r *HTTPRouter) GetStats() map[string]any {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
serverStats := make(map[int]any) serverStats := make(map[int64]any)
totalRoutes := 0 totalRoutes := 0
for port, rs := range r.servers { for port, rs := range r.servers {
@ -222,7 +222,7 @@ func (r *HTTPRouter) GetStats() map[string]any {
} }
return map[string]any{ return map[string]any{
"uptime_seconds": int(time.Since(r.startTime).Seconds()), "uptime_seconds": int64(time.Since(r.startTime).Seconds()),
"total_requests": r.totalRequests.Load(), "total_requests": r.totalRequests.Load(),
"routed_requests": r.routedRequests.Load(), "routed_requests": r.routedRequests.Load(),
"failed_requests": r.failedRequests.Load(), "failed_requests": r.failedRequests.Load(),

View File

@ -24,7 +24,7 @@ type routedSink struct {
// routerServer handles HTTP requests for a specific port // routerServer handles HTTP requests for a specific port
type routerServer struct { type routerServer struct {
port int port int64
server *fasthttp.Server server *fasthttp.Server
logger *log.Logger logger *log.Logger
routes map[string]*routedSink // path prefix -> sink routes map[string]*routedSink // path prefix -> sink

View File

@ -18,7 +18,7 @@ import (
// ConsoleConfig holds common configuration for console sinks // ConsoleConfig holds common configuration for console sinks
type ConsoleConfig struct { type ConsoleConfig struct {
Target string // "stdout", "stderr", or "split" Target string // "stdout", "stderr", or "split"
BufferSize int BufferSize int64
} }
// StdoutSink writes log entries to stdout // StdoutSink writes log entries to stdout
@ -48,7 +48,7 @@ func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format.
config.Target = target config.Target = target
} }
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
config.BufferSize = bufSize config.BufferSize = bufSize
} }
@ -161,7 +161,7 @@ func NewStderrSink(options map[string]any, logger *log.Logger, formatter format.
config.Target = target config.Target = target
} }
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
config.BufferSize = bufSize config.BufferSize = bufSize
} }

View File

@ -50,31 +50,31 @@ func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Fo
) )
// Add optional configurations // Add optional configurations
if maxSize, ok := toInt(options["max_size_mb"]); ok && maxSize > 0 { if maxSize, ok := options["max_size_mb"].(int64); ok && maxSize > 0 {
configArgs = append(configArgs, fmt.Sprintf("max_size_mb=%d", maxSize)) configArgs = append(configArgs, fmt.Sprintf("max_size_mb=%d", maxSize))
} }
if maxTotalSize, ok := toInt(options["max_total_size_mb"]); ok && maxTotalSize >= 0 { if maxTotalSize, ok := options["max_total_size_mb"].(int64); ok && maxTotalSize >= 0 {
configArgs = append(configArgs, fmt.Sprintf("max_total_size_mb=%d", maxTotalSize)) configArgs = append(configArgs, fmt.Sprintf("max_total_size_mb=%d", maxTotalSize))
} }
if retention, ok := toFloat(options["retention_hours"]); ok && retention > 0 { if retention, ok := options["retention_hours"].(int64); ok && retention > 0 {
configArgs = append(configArgs, fmt.Sprintf("retention_period_hrs=%.1f", retention)) configArgs = append(configArgs, fmt.Sprintf("retention_period_hrs=%.1f", retention))
} }
if minDiskFree, ok := toInt(options["min_disk_free_mb"]); ok && minDiskFree > 0 { if minDiskFree, ok := options["min_disk_free_mb"].(int64); ok && minDiskFree > 0 {
configArgs = append(configArgs, fmt.Sprintf("min_disk_free_mb=%d", minDiskFree)) configArgs = append(configArgs, fmt.Sprintf("min_disk_free_mb=%d", minDiskFree))
} }
// Create internal logger for file writing // Create internal logger for file writing
writer := log.NewLogger() writer := log.NewLogger()
if err := writer.InitWithDefaults(configArgs...); err != nil { if err := writer.ApplyOverride(configArgs...); err != nil {
return nil, fmt.Errorf("failed to initialize file writer: %w", err) return nil, fmt.Errorf("failed to initialize file writer: %w", err)
} }
// Buffer size for input channel // Buffer size for input channel
bufferSize := 1000 bufferSize := int64(1000)
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
bufferSize = bufSize bufferSize = bufSize
} }

View File

@ -27,7 +27,7 @@ type HTTPSink struct {
input chan source.LogEntry input chan source.LogEntry
config HTTPConfig config HTTPConfig
server *fasthttp.Server server *fasthttp.Server
activeClients atomic.Int32 activeClients atomic.Int64
mu sync.RWMutex mu sync.RWMutex
startTime time.Time startTime time.Time
done chan struct{} done chan struct{}
@ -52,8 +52,8 @@ type HTTPSink struct {
// HTTPConfig holds HTTP sink configuration // HTTPConfig holds HTTP sink configuration
type HTTPConfig struct { type HTTPConfig struct {
Port int Port int64
BufferSize int BufferSize int64
StreamPath string StreamPath string
StatusPath string StatusPath string
Heartbeat *config.HeartbeatConfig Heartbeat *config.HeartbeatConfig
@ -71,10 +71,10 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
} }
// Extract configuration from options // Extract configuration from options
if port, ok := toInt(options["port"]); ok { if port, ok := options["port"].(int64); ok {
cfg.Port = port cfg.Port = port
} }
if bufSize, ok := toInt(options["buffer_size"]); ok { if bufSize, ok := options["buffer_size"].(int64); ok {
cfg.BufferSize = bufSize cfg.BufferSize = bufSize
} }
if path, ok := options["stream_path"].(string); ok { if path, ok := options["stream_path"].(string); ok {
@ -88,7 +88,7 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
if hb, ok := options["heartbeat"].(map[string]any); ok { if hb, ok := options["heartbeat"].(map[string]any); ok {
cfg.Heartbeat = &config.HeartbeatConfig{} cfg.Heartbeat = &config.HeartbeatConfig{}
cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool) cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool)
if interval, ok := toInt(hb["interval_seconds"]); ok { if interval, ok := hb["interval_seconds"].(int64); ok {
cfg.Heartbeat.IntervalSeconds = interval cfg.Heartbeat.IntervalSeconds = interval
} }
cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool) cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool)
@ -102,25 +102,25 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
if rl, ok := options["net_limit"].(map[string]any); ok { if rl, ok := options["net_limit"].(map[string]any); ok {
cfg.NetLimit = &config.NetLimitConfig{} cfg.NetLimit = &config.NetLimitConfig{}
cfg.NetLimit.Enabled, _ = rl["enabled"].(bool) cfg.NetLimit.Enabled, _ = rl["enabled"].(bool)
if rps, ok := toFloat(rl["requests_per_second"]); ok { if rps, ok := rl["requests_per_second"].(float64); ok {
cfg.NetLimit.RequestsPerSecond = rps cfg.NetLimit.RequestsPerSecond = rps
} }
if burst, ok := toInt(rl["burst_size"]); ok { if burst, ok := rl["burst_size"].(int64); ok {
cfg.NetLimit.BurstSize = burst cfg.NetLimit.BurstSize = burst
} }
if limitBy, ok := rl["limit_by"].(string); ok { if limitBy, ok := rl["limit_by"].(string); ok {
cfg.NetLimit.LimitBy = limitBy cfg.NetLimit.LimitBy = limitBy
} }
if respCode, ok := toInt(rl["response_code"]); ok { if respCode, ok := rl["response_code"].(int64); ok {
cfg.NetLimit.ResponseCode = respCode cfg.NetLimit.ResponseCode = respCode
} }
if msg, ok := rl["response_message"].(string); ok { if msg, ok := rl["response_message"].(string); ok {
cfg.NetLimit.ResponseMessage = msg cfg.NetLimit.ResponseMessage = msg
} }
if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { if maxPerIP, ok := rl["max_connections_per_ip"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerIP = maxPerIP cfg.NetLimit.MaxConnectionsPerIP = maxPerIP
} }
if maxTotal, ok := toInt(rl["max_total_connections"]); ok { if maxTotal, ok := rl["max_total_connections"].(int64); ok {
cfg.NetLimit.MaxTotalConnections = maxTotal cfg.NetLimit.MaxTotalConnections = maxTotal
} }
} }
@ -256,7 +256,7 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
// Check net limit first // Check net limit first
remoteAddr := ctx.RemoteAddr().String() remoteAddr := ctx.RemoteAddr().String()
if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddr); !allowed { if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddr); !allowed {
ctx.SetStatusCode(statusCode) ctx.SetStatusCode(int(statusCode))
ctx.SetContentType("application/json") ctx.SetContentType("application/json")
json.NewEncoder(ctx).Encode(map[string]any{ json.NewEncoder(ctx).Encode(map[string]any{
"error": message, "error": message,
@ -502,7 +502,7 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
} }
// GetActiveConnections returns the current number of active clients // GetActiveConnections returns the current number of active clients
func (h *HTTPSink) GetActiveConnections() int32 { func (h *HTTPSink) GetActiveConnections() int64 {
return h.activeClients.Load() return h.activeClients.Load()
} }

View File

@ -36,20 +36,20 @@ type HTTPClientSink struct {
failedBatches atomic.Uint64 failedBatches atomic.Uint64
lastProcessed atomic.Value // time.Time lastProcessed atomic.Value // time.Time
lastBatchSent atomic.Value // time.Time lastBatchSent atomic.Value // time.Time
activeConnections atomic.Int32 activeConnections atomic.Int64
} }
// HTTPClientConfig holds HTTP client sink configuration // HTTPClientConfig holds HTTP client sink configuration
type HTTPClientConfig struct { type HTTPClientConfig struct {
URL string URL string
BufferSize int BufferSize int64
BatchSize int BatchSize int64
BatchDelay time.Duration BatchDelay time.Duration
Timeout time.Duration Timeout time.Duration
Headers map[string]string Headers map[string]string
// Retry configuration // Retry configuration
MaxRetries int MaxRetries int64
RetryDelay time.Duration RetryDelay time.Duration
RetryBackoff float64 // Multiplier for exponential backoff RetryBackoff float64 // Multiplier for exponential backoff
@ -60,13 +60,13 @@ type HTTPClientConfig struct {
// NewHTTPClientSink creates a new HTTP client sink // NewHTTPClientSink creates a new HTTP client sink
func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) { func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) {
cfg := HTTPClientConfig{ cfg := HTTPClientConfig{
BufferSize: 1000, BufferSize: int64(1000),
BatchSize: 100, BatchSize: int64(100),
BatchDelay: time.Second, BatchDelay: time.Second,
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
MaxRetries: 3, MaxRetries: int64(3),
RetryDelay: time.Second, RetryDelay: time.Second,
RetryBackoff: 2.0, RetryBackoff: float64(2.0),
Headers: make(map[string]string), Headers: make(map[string]string),
} }
@ -87,25 +87,25 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for
cfg.URL = urlStr cfg.URL = urlStr
// Extract other options // Extract other options
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
cfg.BufferSize = bufSize cfg.BufferSize = bufSize
} }
if batchSize, ok := toInt(options["batch_size"]); ok && batchSize > 0 { if batchSize, ok := options["batch_size"].(int64); ok && batchSize > 0 {
cfg.BatchSize = batchSize cfg.BatchSize = batchSize
} }
if delayMs, ok := toInt(options["batch_delay_ms"]); ok && delayMs > 0 { if delayMs, ok := options["batch_delay_ms"].(int64); ok && delayMs > 0 {
cfg.BatchDelay = time.Duration(delayMs) * time.Millisecond cfg.BatchDelay = time.Duration(delayMs) * time.Millisecond
} }
if timeoutSec, ok := toInt(options["timeout_seconds"]); ok && timeoutSec > 0 { if timeoutSec, ok := options["timeout_seconds"].(int64); ok && timeoutSec > 0 {
cfg.Timeout = time.Duration(timeoutSec) * time.Second cfg.Timeout = time.Duration(timeoutSec) * time.Second
} }
if maxRetries, ok := toInt(options["max_retries"]); ok && maxRetries >= 0 { if maxRetries, ok := options["max_retries"].(int64); ok && maxRetries >= 0 {
cfg.MaxRetries = maxRetries cfg.MaxRetries = maxRetries
} }
if retryDelayMs, ok := toInt(options["retry_delay_ms"]); ok && retryDelayMs > 0 { if retryDelayMs, ok := options["retry_delay_ms"].(int64); ok && retryDelayMs > 0 {
cfg.RetryDelay = time.Duration(retryDelayMs) * time.Millisecond cfg.RetryDelay = time.Duration(retryDelayMs) * time.Millisecond
} }
if backoff, ok := toFloat(options["retry_backoff"]); ok && backoff >= 1.0 { if backoff, ok := options["retry_backoff"].(float64); ok && backoff >= 1.0 {
cfg.RetryBackoff = backoff cfg.RetryBackoff = backoff
} }
if insecure, ok := options["insecure_skip_verify"].(bool); ok { if insecure, ok := options["insecure_skip_verify"].(bool); ok {
@ -244,7 +244,7 @@ func (h *HTTPClientSink) processLoop(ctx context.Context) {
h.batch = append(h.batch, entry) h.batch = append(h.batch, entry)
// Check if batch is full // Check if batch is full
if len(h.batch) >= h.config.BatchSize { if int64(len(h.batch)) >= h.config.BatchSize {
batch := h.batch batch := h.batch
h.batch = make([]source.LogEntry, 0, h.config.BatchSize) h.batch = make([]source.LogEntry, 0, h.config.BatchSize)
h.batchMu.Unlock() h.batchMu.Unlock()
@ -337,7 +337,7 @@ func (h *HTTPClientSink) sendBatch(batch []source.LogEntry) {
var lastErr error var lastErr error
retryDelay := h.config.RetryDelay retryDelay := h.config.RetryDelay
for attempt := 0; attempt <= h.config.MaxRetries; attempt++ { for attempt := int64(0); attempt <= h.config.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
// Wait before retry // Wait before retry
time.Sleep(retryDelay) time.Sleep(retryDelay)

View File

@ -27,35 +27,8 @@ type Sink interface {
type SinkStats struct { type SinkStats struct {
Type string Type string
TotalProcessed uint64 TotalProcessed uint64
ActiveConnections int32 ActiveConnections int64
StartTime time.Time StartTime time.Time
LastProcessed time.Time LastProcessed time.Time
Details map[string]any Details map[string]any
} }
// Helper functions for type conversion
func toInt(v any) (int, bool) {
switch val := v.(type) {
case int:
return val, true
case int64:
return int(val), true
case float64:
return int(val), true
default:
return 0, false
}
}
func toFloat(v any) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int:
return float64(val), true
case int64:
return float64(val), true
default:
return 0, false
}
}

View File

@ -25,7 +25,7 @@ type TCPSink struct {
config TCPConfig config TCPConfig
server *tcpServer server *tcpServer
done chan struct{} done chan struct{}
activeConns atomic.Int32 activeConns atomic.Int64
startTime time.Time startTime time.Time
engine *gnet.Engine engine *gnet.Engine
engineMu sync.Mutex engineMu sync.Mutex
@ -41,8 +41,8 @@ type TCPSink struct {
// TCPConfig holds TCP sink configuration // TCPConfig holds TCP sink configuration
type TCPConfig struct { type TCPConfig struct {
Port int Port int64
BufferSize int BufferSize int64
Heartbeat *config.HeartbeatConfig Heartbeat *config.HeartbeatConfig
SSL *config.SSLConfig SSL *config.SSLConfig
NetLimit *config.NetLimitConfig NetLimit *config.NetLimitConfig
@ -51,15 +51,15 @@ type TCPConfig struct {
// NewTCPSink creates a new TCP streaming sink // NewTCPSink creates a new TCP streaming sink
func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) { func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) {
cfg := TCPConfig{ cfg := TCPConfig{
Port: 9090, Port: int64(9090),
BufferSize: 1000, BufferSize: int64(1000),
} }
// Extract configuration from options // Extract configuration from options
if port, ok := toInt(options["port"]); ok { if port, ok := options["port"].(int64); ok {
cfg.Port = port cfg.Port = port
} }
if bufSize, ok := toInt(options["buffer_size"]); ok { if bufSize, ok := options["buffer_size"].(int64); ok {
cfg.BufferSize = bufSize cfg.BufferSize = bufSize
} }
@ -67,7 +67,7 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For
if hb, ok := options["heartbeat"].(map[string]any); ok { if hb, ok := options["heartbeat"].(map[string]any); ok {
cfg.Heartbeat = &config.HeartbeatConfig{} cfg.Heartbeat = &config.HeartbeatConfig{}
cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool) cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool)
if interval, ok := toInt(hb["interval_seconds"]); ok { if interval, ok := hb["interval_seconds"].(int64); ok {
cfg.Heartbeat.IntervalSeconds = interval cfg.Heartbeat.IntervalSeconds = interval
} }
cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool) cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool)
@ -81,25 +81,25 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For
if rl, ok := options["net_limit"].(map[string]any); ok { if rl, ok := options["net_limit"].(map[string]any); ok {
cfg.NetLimit = &config.NetLimitConfig{} cfg.NetLimit = &config.NetLimitConfig{}
cfg.NetLimit.Enabled, _ = rl["enabled"].(bool) cfg.NetLimit.Enabled, _ = rl["enabled"].(bool)
if rps, ok := toFloat(rl["requests_per_second"]); ok { if rps, ok := rl["requests_per_second"].(float64); ok {
cfg.NetLimit.RequestsPerSecond = rps cfg.NetLimit.RequestsPerSecond = rps
} }
if burst, ok := toInt(rl["burst_size"]); ok { if burst, ok := rl["burst_size"].(int64); ok {
cfg.NetLimit.BurstSize = burst cfg.NetLimit.BurstSize = burst
} }
if limitBy, ok := rl["limit_by"].(string); ok { if limitBy, ok := rl["limit_by"].(string); ok {
cfg.NetLimit.LimitBy = limitBy cfg.NetLimit.LimitBy = limitBy
} }
if respCode, ok := toInt(rl["response_code"]); ok { if respCode, ok := rl["response_code"].(int64); ok {
cfg.NetLimit.ResponseCode = respCode cfg.NetLimit.ResponseCode = respCode
} }
if msg, ok := rl["response_message"].(string); ok { if msg, ok := rl["response_message"].(string); ok {
cfg.NetLimit.ResponseMessage = msg cfg.NetLimit.ResponseMessage = msg
} }
if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { if maxPerIP, ok := rl["max_connections_per_ip"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerIP = maxPerIP cfg.NetLimit.MaxConnectionsPerIP = maxPerIP
} }
if maxTotal, ok := toInt(rl["max_total_connections"]); ok { if maxTotal, ok := rl["max_total_connections"].(int64); ok {
cfg.NetLimit.MaxTotalConnections = maxTotal cfg.NetLimit.MaxTotalConnections = maxTotal
} }
} }
@ -283,7 +283,7 @@ func (t *TCPSink) createHeartbeatEntry() source.LogEntry {
if t.config.Heartbeat.IncludeStats { if t.config.Heartbeat.IncludeStats {
fields["active_connections"] = t.activeConns.Load() fields["active_connections"] = t.activeConns.Load()
fields["uptime_seconds"] = int(time.Since(t.startTime).Seconds()) fields["uptime_seconds"] = int64(time.Since(t.startTime).Seconds())
} }
fieldsJSON, _ := json.Marshal(fields) fieldsJSON, _ := json.Marshal(fields)
@ -298,7 +298,7 @@ func (t *TCPSink) createHeartbeatEntry() source.LogEntry {
} }
// GetActiveConnections returns the current number of connections // GetActiveConnections returns the current number of connections
func (t *TCPSink) GetActiveConnections() int32 { func (t *TCPSink) GetActiveConnections() int64 {
return t.activeConns.Load() return t.activeConns.Load()
} }

View File

@ -44,7 +44,7 @@ type TCPClientSink struct {
// TCPClientConfig holds TCP client sink configuration // TCPClientConfig holds TCP client sink configuration
type TCPClientConfig struct { type TCPClientConfig struct {
Address string Address string
BufferSize int BufferSize int64
DialTimeout time.Duration DialTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
KeepAlive time.Duration KeepAlive time.Duration
@ -58,13 +58,13 @@ type TCPClientConfig struct {
// NewTCPClientSink creates a new TCP client sink // NewTCPClientSink creates a new TCP client sink
func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) { func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) {
cfg := TCPClientConfig{ cfg := TCPClientConfig{
BufferSize: 1000, BufferSize: int64(1000),
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
ReconnectDelay: time.Second, ReconnectDelay: time.Second,
MaxReconnectDelay: 30 * time.Second, MaxReconnectDelay: 30 * time.Second,
ReconnectBackoff: 1.5, ReconnectBackoff: float64(1.5),
} }
// Extract address // Extract address
@ -81,25 +81,25 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
cfg.Address = address cfg.Address = address
// Extract other options // Extract other options
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
cfg.BufferSize = bufSize cfg.BufferSize = bufSize
} }
if dialTimeout, ok := toInt(options["dial_timeout_seconds"]); ok && dialTimeout > 0 { if dialTimeout, ok := options["dial_timeout_seconds"].(int64); ok && dialTimeout > 0 {
cfg.DialTimeout = time.Duration(dialTimeout) * time.Second cfg.DialTimeout = time.Duration(dialTimeout) * time.Second
} }
if writeTimeout, ok := toInt(options["write_timeout_seconds"]); ok && writeTimeout > 0 { if writeTimeout, ok := options["write_timeout_seconds"].(int64); ok && writeTimeout > 0 {
cfg.WriteTimeout = time.Duration(writeTimeout) * time.Second cfg.WriteTimeout = time.Duration(writeTimeout) * time.Second
} }
if keepAlive, ok := toInt(options["keep_alive_seconds"]); ok && keepAlive > 0 { if keepAlive, ok := options["keep_alive_seconds"].(int64); ok && keepAlive > 0 {
cfg.KeepAlive = time.Duration(keepAlive) * time.Second cfg.KeepAlive = time.Duration(keepAlive) * time.Second
} }
if reconnectDelay, ok := toInt(options["reconnect_delay_ms"]); ok && reconnectDelay > 0 { if reconnectDelay, ok := options["reconnect_delay_ms"].(int64); ok && reconnectDelay > 0 {
cfg.ReconnectDelay = time.Duration(reconnectDelay) * time.Millisecond cfg.ReconnectDelay = time.Duration(reconnectDelay) * time.Millisecond
} }
if maxReconnectDelay, ok := toInt(options["max_reconnect_delay_seconds"]); ok && maxReconnectDelay > 0 { if maxReconnectDelay, ok := options["max_reconnect_delay_seconds"].(int64); ok && maxReconnectDelay > 0 {
cfg.MaxReconnectDelay = time.Duration(maxReconnectDelay) * time.Second cfg.MaxReconnectDelay = time.Duration(maxReconnectDelay) * time.Second
} }
if backoff, ok := toFloat(options["reconnect_backoff"]); ok && backoff >= 1.0 { if backoff, ok := options["reconnect_backoff"].(float64); ok && backoff >= 1.0 {
cfg.ReconnectBackoff = backoff cfg.ReconnectBackoff = backoff
} }
@ -162,7 +162,7 @@ func (t *TCPClientSink) GetStats() SinkStats {
connected := t.conn != nil connected := t.conn != nil
t.connMu.RUnlock() t.connMu.RUnlock()
activeConns := int32(0) activeConns := int64(0)
if connected { if connected {
activeConns = 1 activeConns = 1
} }

View File

@ -47,7 +47,7 @@ func NewDirectorySource(options map[string]any, logger *log.Logger) (*DirectoryS
} }
checkInterval := 100 * time.Millisecond checkInterval := 100 * time.Millisecond
if ms, ok := toInt(options["check_interval_ms"]); ok && ms > 0 { if ms, ok := options["check_interval_ms"].(int64); ok && ms > 0 {
checkInterval = time.Duration(ms) * time.Millisecond checkInterval = time.Duration(ms) * time.Millisecond
} }
@ -115,7 +115,7 @@ func (ds *DirectorySource) GetStats() SourceStats {
lastEntry, _ := ds.lastEntryTime.Load().(time.Time) lastEntry, _ := ds.lastEntryTime.Load().(time.Time)
ds.mu.RLock() ds.mu.RLock()
watcherCount := len(ds.watchers) watcherCount := int64(len(ds.watchers))
details := make(map[string]any) details := make(map[string]any)
// Add watcher details // Add watcher details

View File

@ -26,7 +26,7 @@ type WatcherInfo struct {
ModTime time.Time ModTime time.Time
EntriesRead uint64 EntriesRead uint64
LastReadTime time.Time LastReadTime time.Time
Rotations int Rotations int64
} }
type fileWatcher struct { type fileWatcher struct {
@ -38,7 +38,7 @@ type fileWatcher struct {
modTime time.Time modTime time.Time
mu sync.Mutex mu sync.Mutex
stopped bool stopped bool
rotationSeq int rotationSeq int64
entriesRead atomic.Uint64 entriesRead atomic.Uint64
lastReadTime atomic.Value // time.Time lastReadTime atomic.Value // time.Time
logger *log.Logger logger *log.Logger
@ -258,7 +258,7 @@ func (w *fileWatcher) checkFile() error {
continue continue
} }
rawSize := len(line) rawSize := int64(len(line))
entry := w.parseLine(line) entry := w.parseLine(line)
entry.RawSize = rawSize entry.RawSize = rawSize

View File

@ -17,9 +17,9 @@ import (
// HTTPSource receives log entries via HTTP POST requests // HTTPSource receives log entries via HTTP POST requests
type HTTPSource struct { type HTTPSource struct {
port int port int64
ingestPath string ingestPath string
bufferSize int bufferSize int64
server *fasthttp.Server server *fasthttp.Server
subscribers []chan LogEntry subscribers []chan LogEntry
mu sync.RWMutex mu sync.RWMutex
@ -38,7 +38,7 @@ type HTTPSource struct {
// NewHTTPSource creates a new HTTP server source // NewHTTPSource creates a new HTTP server source
func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, error) { func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, error) {
port, ok := toInt(options["port"]) port, ok := options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return nil, fmt.Errorf("http source requires valid 'port' option") return nil, fmt.Errorf("http source requires valid 'port' option")
} }
@ -48,8 +48,8 @@ func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, err
ingestPath = path ingestPath = path
} }
bufferSize := 1000 bufferSize := int64(1000)
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
bufferSize = bufSize bufferSize = bufSize
} }
@ -73,19 +73,19 @@ func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, err
if rps, ok := toFloat(rl["requests_per_second"]); ok { if rps, ok := toFloat(rl["requests_per_second"]); ok {
cfg.RequestsPerSecond = rps cfg.RequestsPerSecond = rps
} }
if burst, ok := toInt(rl["burst_size"]); ok { if burst, ok := rl["burst_size"].(int64); ok {
cfg.BurstSize = burst cfg.BurstSize = burst
} }
if limitBy, ok := rl["limit_by"].(string); ok { if limitBy, ok := rl["limit_by"].(string); ok {
cfg.LimitBy = limitBy cfg.LimitBy = limitBy
} }
if respCode, ok := toInt(rl["response_code"]); ok { if respCode, ok := rl["response_code"].(int64); ok {
cfg.ResponseCode = respCode cfg.ResponseCode = respCode
} }
if msg, ok := rl["response_message"].(string); ok { if msg, ok := rl["response_message"].(string); ok {
cfg.ResponseMessage = msg cfg.ResponseMessage = msg
} }
if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { if maxPerIP, ok := rl["max_connections_per_ip"].(int64); ok {
cfg.MaxConnectionsPerIP = maxPerIP cfg.MaxConnectionsPerIP = maxPerIP
} }
@ -205,7 +205,7 @@ func (h *HTTPSource) requestHandler(ctx *fasthttp.RequestCtx) {
remoteAddr := ctx.RemoteAddr().String() remoteAddr := ctx.RemoteAddr().String()
if h.netLimiter != nil { if h.netLimiter != nil {
if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddr); !allowed { if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddr); !allowed {
ctx.SetStatusCode(statusCode) ctx.SetStatusCode(int(statusCode))
ctx.SetContentType("application/json") ctx.SetContentType("application/json")
json.NewEncoder(ctx).Encode(map[string]any{ json.NewEncoder(ctx).Encode(map[string]any{
"error": message, "error": message,
@ -271,7 +271,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) {
if single.Source == "" { if single.Source == "" {
single.Source = "http" single.Source = "http"
} }
single.RawSize = len(body) single.RawSize = int64(len(body))
entries = append(entries, single) entries = append(entries, single)
return entries, nil return entries, nil
} }
@ -280,7 +280,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) {
var array []LogEntry var array []LogEntry
if err := json.Unmarshal(body, &array); err == nil { if err := json.Unmarshal(body, &array); err == nil {
// TODO: Placeholder; For array, divide total size by entry count as approximation // TODO: Placeholder; For array, divide total size by entry count as approximation
approxSizePerEntry := len(body) / len(array) approxSizePerEntry := int64(len(body) / len(array))
for i, entry := range array { for i, entry := range array {
if entry.Message == "" { if entry.Message == "" {
return nil, fmt.Errorf("entry %d missing required field: message", i) return nil, fmt.Errorf("entry %d missing required field: message", i)
@ -318,7 +318,7 @@ func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) {
if entry.Source == "" { if entry.Source == "" {
entry.Source = "http" entry.Source = "http"
} }
entry.RawSize = len(line) entry.RawSize = int64(len(line))
entries = append(entries, entry) entries = append(entries, entry)
} }

View File

@ -13,7 +13,7 @@ type LogEntry struct {
Level string `json:"level,omitempty"` Level string `json:"level,omitempty"`
Message string `json:"message"` Message string `json:"message"`
Fields json.RawMessage `json:"fields,omitempty"` Fields json.RawMessage `json:"fields,omitempty"`
RawSize int `json:"-"` RawSize int64 `json:"-"`
} }
// Source represents an input data stream // Source represents an input data stream
@ -40,17 +40,3 @@ type SourceStats struct {
LastEntryTime time.Time LastEntryTime time.Time
Details map[string]any Details map[string]any
} }
// Helper function for type conversion
func toInt(v any) (int, bool) {
switch val := v.(type) {
case int:
return val, true
case int64:
return int(val), true
case float64:
return int(val), true
default:
return 0, false
}
}

View File

@ -82,7 +82,7 @@ func (s *StdinSource) readLoop() {
Source: "stdin", Source: "stdin",
Message: line, Message: line,
Level: extractLogLevel(line), Level: extractLogLevel(line),
RawSize: len(line), RawSize: int64(len(line)),
} }
s.publish(entry) s.publish(entry)

View File

@ -20,8 +20,8 @@ import (
// TCPSource receives log entries via TCP connections // TCPSource receives log entries via TCP connections
type TCPSource struct { type TCPSource struct {
port int port int64
bufferSize int bufferSize int64
server *tcpSourceServer server *tcpSourceServer
subscribers []chan LogEntry subscribers []chan LogEntry
mu sync.RWMutex mu sync.RWMutex
@ -36,20 +36,20 @@ type TCPSource struct {
totalEntries atomic.Uint64 totalEntries atomic.Uint64
droppedEntries atomic.Uint64 droppedEntries atomic.Uint64
invalidEntries atomic.Uint64 invalidEntries atomic.Uint64
activeConns atomic.Int32 activeConns atomic.Int64
startTime time.Time startTime time.Time
lastEntryTime atomic.Value // time.Time lastEntryTime atomic.Value // time.Time
} }
// NewTCPSource creates a new TCP server source // NewTCPSource creates a new TCP server source
func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error) { func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error) {
port, ok := toInt(options["port"]) port, ok := options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return nil, fmt.Errorf("tcp source requires valid 'port' option") return nil, fmt.Errorf("tcp source requires valid 'port' option")
} }
bufferSize := 1000 bufferSize := int64(1000)
if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
bufferSize = bufSize bufferSize = bufSize
} }
@ -72,16 +72,16 @@ func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error
if rps, ok := toFloat(rl["requests_per_second"]); ok { if rps, ok := toFloat(rl["requests_per_second"]); ok {
cfg.RequestsPerSecond = rps cfg.RequestsPerSecond = rps
} }
if burst, ok := toInt(rl["burst_size"]); ok { if burst, ok := rl["burst_size"].(int64); ok {
cfg.BurstSize = burst cfg.BurstSize = burst
} }
if limitBy, ok := rl["limit_by"].(string); ok { if limitBy, ok := rl["limit_by"].(string); ok {
cfg.LimitBy = limitBy cfg.LimitBy = limitBy
} }
if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { if maxPerIP, ok := rl["max_connections_per_ip"].(int64); ok {
cfg.MaxConnectionsPerIP = maxPerIP cfg.MaxConnectionsPerIP = maxPerIP
} }
if maxTotal, ok := toInt(rl["max_total_connections"]); ok { if maxTotal, ok := rl["max_total_connections"].(int64); ok {
cfg.MaxTotalConnections = maxTotal cfg.MaxTotalConnections = maxTotal
} }
@ -342,7 +342,7 @@ func (s *tcpSourceServer) OnTraffic(c gnet.Conn) gnet.Action {
} }
// Capture raw line size before parsing // Capture raw line size before parsing
rawSize := len(line) rawSize := int64(len(line))
// Parse JSON log entry // Parse JSON log entry
var entry LogEntry var entry LogEntry