v0.12.1 config cleanup

This commit is contained in:
2026-01-05 02:20:38 -05:00
parent 15f484f98b
commit 36e7f8a6a9
12 changed files with 111 additions and 59 deletions

View File

@ -1,6 +1,6 @@
# LogWisp
A high-performance, pipeline-based log transport and processing system built in Go. LogWisp provides flexible log collection, filtering, formatting, and distribution with security and reliability features.
A pipeline-based log transport and processing system built in Go. LogWisp provides flexible log collection, filtering, formatting, and distribution with security and reliability features.
## Features

View File

@ -198,7 +198,6 @@ type FileSourceOptions struct {
Directory string `toml:"directory"`
Pattern string `toml:"pattern"` // glob pattern
CheckIntervalMS int64 `toml:"check_interval_ms"`
Recursive bool `toml:"recursive"` // TODO: implement logic
}
// ConsoleSourceOptions defines settings for a stdin-based source

View File

@ -56,19 +56,17 @@ func NewConsoleSinkPlugin(
logger *log.Logger,
proxy *session.Proxy,
) (sink.Sink, error) {
// Step 1: Create empty config struct with defaults
// Create empty config struct with defaults
opts := &config.ConsoleSinkOptions{
Target: "stdout", // Default target
BufferSize: 1000, // Default buffer size
Target: DefaultConsoleTarget,
}
// Step 2: Use lconfig to scan map into struct (overriding defaults)
// Scan config map into struct
if err := lconfig.ScanMap(configMap, opts); err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Step 3: Validate required fields
// Target validation
// Validate and apply defaults
var output io.Writer
switch opts.Target {
case "stdout":
@ -80,7 +78,7 @@ func NewConsoleSinkPlugin(
}
if opts.BufferSize <= 0 {
opts.BufferSize = 1000
opts.BufferSize = DefaultConsoleBufferSize
}
// Step 4: Create and return plugin instance

View File

@ -0,0 +1,7 @@
package console
const (
// Defaults
DefaultConsoleTarget = "stdout"
DefaultConsoleBufferSize = 1000
)

View File

@ -0,0 +1,11 @@
package file
const (
// Defaults
DefaultFileMaxSizeMB = 100
DefaultFileMaxTotalSizeMB = 1000
DefaultFileMinDiskFreeMB = 100
DefaultFileRetentionHours = 168 // 7 days
DefaultFileBufferSize = 1000
DefaultFileFlushIntervalMs = 100
)

View File

@ -54,49 +54,38 @@ func NewFileSinkPlugin(
logger *log.Logger,
proxy *session.Proxy,
) (sink.Sink, error) {
// Step 1: Create empty config struct with defaults
opts := &config.FileSinkOptions{
Directory: "", // Required field - no default
Name: "", // Required field - no default
MaxSizeMB: 100, // Default max file size
MaxTotalSizeMB: 1000, // Default max total size
MinDiskFreeMB: 100, // Default min disk free
RetentionHours: 168, // Default retention (7 days)
BufferSize: 1000, // Default buffer size
FlushIntervalMs: 100, // Default flush interval
}
// Create empty config struct
opts := &config.FileSinkOptions{}
// Step 2: Use lconfig to scan map into struct (overriding defaults)
// Scan config map into struct
if err := lconfig.ScanMap(configMap, opts); err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Step 3: Validate required fields
// Validate required fields and apply defaults
if opts.Directory == "" {
return nil, fmt.Errorf("directory is mandatory")
return nil, fmt.Errorf("directory is required")
}
if opts.Name == "" {
return nil, fmt.Errorf("name is mandatory")
return nil, fmt.Errorf("name is required")
}
// Validate sizes
if opts.MaxSizeMB <= 0 {
return nil, fmt.Errorf("max_size_mb must be positive")
opts.MaxSizeMB = DefaultFileMaxSizeMB
}
if opts.MaxTotalSizeMB <= 0 {
return nil, fmt.Errorf("max_total_size_mb must be positive")
opts.MaxTotalSizeMB = DefaultFileMaxTotalSizeMB
}
if opts.MinDiskFreeMB < 0 {
return nil, fmt.Errorf("min_disk_free_mb cannot be negative")
opts.MinDiskFreeMB = DefaultFileMinDiskFreeMB
}
if opts.RetentionHours <= 0 {
return nil, fmt.Errorf("retention_hours must be positive")
opts.RetentionHours = DefaultFileRetentionHours
}
if opts.BufferSize <= 0 {
return nil, fmt.Errorf("buffer_size must be positive")
opts.BufferSize = DefaultFileBufferSize
}
if opts.FlushIntervalMs <= 0 {
return nil, fmt.Errorf("flush_interval_ms must be positive")
opts.FlushIntervalMs = DefaultFileFlushIntervalMs
}
// Step 4: Create and return plugin instance
@ -209,7 +198,7 @@ func (fs *FileSink) Stop() {
}
// Shutdown the writer with timeout
if err := fs.writer.Shutdown(2 * time.Second); err != nil {
if err := fs.writer.Shutdown(core.LoggerShutdownTimeout); err != nil {
fs.logger.Error("msg", "Error shutting down file writer",
"component", "file_sink",
"error", err)

View File

@ -3,6 +3,14 @@ package http
import "time"
const (
// Server lifecycle
HttpServerStartTimeout = 100 * time.Millisecond
HttpServerShutdownTimeout = 2 * time.Second
// Defaults
DefaultHTTPHost = "0.0.0.0"
DefaultHTTPBufferSize = 1000
DefaultHTTPStreamPath = "/stream"
DefaultHTTPStatusPath = "/status"
HTTPMaxPort = 65535
)

View File

@ -74,11 +74,8 @@ func NewHTTPSinkPlugin(
proxy *session.Proxy,
) (sink.Sink, error) {
opts := &config.HTTPSinkOptions{
Host: "0.0.0.0",
Host: DefaultHTTPHost,
Port: 0,
StreamPath: "/stream",
StatusPath: "/status",
BufferSize: 1000,
WriteTimeout: 0, // SSE indefinite streaming
}
@ -86,17 +83,18 @@ func NewHTTPSinkPlugin(
return nil, fmt.Errorf("failed to parse config: %w", err)
}
if opts.Port <= 0 || opts.Port > 65535 {
return nil, fmt.Errorf("port must be between 1 and 65535")
// Validate and apply defaults
if opts.Port <= 0 || opts.Port > HTTPMaxPort {
return nil, fmt.Errorf("port must be between 1 and %d", HTTPMaxPort)
}
if opts.BufferSize <= 0 {
opts.BufferSize = 1000
opts.BufferSize = DefaultHTTPBufferSize
}
if opts.StreamPath == "" {
opts.StreamPath = "/stream"
opts.StreamPath = DefaultHTTPStreamPath
}
if opts.StatusPath == "" {
opts.StatusPath = "/status"
opts.StatusPath = DefaultHTTPStatusPath
}
h := &HTTPSink{

View File

@ -0,0 +1,21 @@
package tcp
import (
"time"
)
const (
// Server lifecycle
TCPServerStartTimeout = 100 * time.Millisecond
TCPServerShutdownTimeout = 2 * time.Second
// Connection management
TCPMaxConsecutiveWriteErrors = 3
TCPMaxPort = 65535
// Defaults
DefaultTCPHost = "0.0.0.0"
DefaultTCPBufferSize = 1000
DefaultTCPWriteTimeoutMS = 5000
DefaultTCPKeepAlivePeriod = 30000
)

View File

@ -70,12 +70,9 @@ func NewTCPSinkPlugin(
) (sink.Sink, error) {
// Create config struct with defaults
opts := &config.TCPSinkOptions{
Host: "0.0.0.0",
Port: 0, // Required
BufferSize: 1000,
WriteTimeout: 5000,
KeepAlive: true,
KeepAlivePeriod: 30000,
Host: DefaultTCPHost,
Port: 0,
KeepAlive: true,
}
// Parse config map into struct
@ -84,11 +81,18 @@ func NewTCPSinkPlugin(
}
// Validate required fields
if opts.Port <= 0 || opts.Port > 65535 {
return nil, fmt.Errorf("port must be between 1 and 65535")
// Validate and apply defaults
if opts.Port <= 0 || opts.Port > TCPMaxPort {
return nil, fmt.Errorf("port must be between 1 and %d", TCPMaxPort)
}
if opts.BufferSize <= 0 {
opts.BufferSize = 1000
opts.BufferSize = DefaultTCPBufferSize
}
if opts.WriteTimeout <= 0 {
opts.WriteTimeout = DefaultTCPWriteTimeoutMS
}
if opts.KeepAlivePeriod <= 0 {
opts.KeepAlivePeriod = DefaultTCPKeepAlivePeriod
}
t := &TCPSink{
@ -150,6 +154,13 @@ func (t *TCPSink) Start(ctx context.Context) error {
gnet.WithReusePort(true),
}
// Apply TCP keep-alive settings from config
if t.config.KeepAlive {
opts = append(opts,
gnet.WithTCPKeepAlive(time.Duration(t.config.KeepAlivePeriod)*time.Millisecond),
)
}
// Start gnet server
errChan := make(chan error, 1)
go func() {
@ -185,7 +196,7 @@ func (t *TCPSink) Start(ctx context.Context) error {
close(t.done)
t.wg.Wait()
return err
case <-time.After(100 * time.Millisecond):
case <-time.After(TCPServerStartTimeout):
t.logger.Info("msg", "TCP server started",
"component", "tcp_sink",
"instance_id", t.id,
@ -208,7 +219,7 @@ func (t *TCPSink) Stop() {
t.engineMu.Unlock()
if engine != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), TCPServerShutdownTimeout)
defer cancel()
(*engine).Stop(ctx)
}
@ -306,6 +317,11 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
}
}
// Apply write timeout from config
if s.sink.config.WriteTimeout > 0 {
c.SetWriteDeadline(time.Now().Add(time.Duration(s.sink.config.WriteTimeout) * time.Millisecond))
}
// Create session via proxy
sess := s.sink.proxy.CreateSession(remoteAddrStr, map[string]any{
"type": "tcp_client",
@ -392,6 +408,11 @@ func (t *TCPSink) broadcastData(data []byte) {
t.proxy.UpdateActivity(client.sessionID)
}
// Refresh write deadline on each write if configured
if t.config.WriteTimeout > 0 {
conn.SetWriteDeadline(time.Now().Add(time.Duration(t.config.WriteTimeout) * time.Millisecond))
}
conn.AsyncWrite(data, func(c gnet.Conn, err error) error {
if err != nil {
t.writeErrors.Add(1)
@ -422,8 +443,8 @@ func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
"error", err,
"consecutive_errors", errorCount)
// Close connection after 3 consecutive write errors
if errorCount >= 3 {
// Close connection max consecutive write errors
if errorCount >= TCPMaxConsecutiveWriteErrors {
t.logger.Warn("msg", "Closing connection due to repeated write errors",
"component", "tcp_sink",
"remote_addr", remoteAddrStr,

View File

@ -0,0 +1 @@
package console

View File

@ -66,10 +66,9 @@ func NewFileSourcePlugin(
) (source.Source, error) {
// Step 1: Create empty config struct with defaults
opts := &config.FileSourceOptions{
Directory: "", // Required field - no default
Pattern: "*", // Default pattern
CheckIntervalMS: 100, // Default check interval
Recursive: false, // Default recursive
Directory: "", // Required field - no default
Pattern: "*", // Default pattern
CheckIntervalMS: 100, // Default check interval
}
// Step 2: Use lconfig to scan map into struct (overriding defaults)