v0.7.0 major configuration and sub-command restructuring, not tested, docs and default config outdated

This commit is contained in:
2025-10-09 09:35:21 -04:00
parent 490fb777ab
commit 89e6a4ea05
61 changed files with 3248 additions and 4571 deletions

View File

@ -18,6 +18,7 @@ import (
// ConsoleSink writes log entries to the console (stdout/stderr) using an dedicated logger instance
type ConsoleSink struct {
config *config.ConsoleSinkOptions
input chan core.LogEntry
writer *log.Logger // Dedicated internal logger instance for console writing
done chan struct{}
@ -31,22 +32,24 @@ type ConsoleSink struct {
}
// Creates a new console sink
func NewConsoleSink(options map[string]any, appLogger *log.Logger, formatter format.Formatter) (*ConsoleSink, error) {
target := "stdout"
if t, ok := options["target"].(string); ok {
target = t
func NewConsoleSink(opts *config.ConsoleSinkOptions, appLogger *log.Logger, formatter format.Formatter) (*ConsoleSink, error) {
if opts == nil {
return nil, fmt.Errorf("console sink options cannot be nil")
}
bufferSize := int64(1000)
if buf, ok := options["buffer_size"].(int64); ok && buf > 0 {
bufferSize = buf
// Set defaults if not configured
if opts.Target == "" {
opts.Target = "stdout"
}
if opts.BufferSize <= 0 {
opts.BufferSize = 1000
}
// Dedicated logger instance as console writer
writer, err := log.NewBuilder().
EnableFile(false).
EnableConsole(true).
ConsoleTarget(target).
ConsoleTarget(opts.Target).
Format("raw"). // Passthrough pre-formatted messages
ShowTimestamp(false). // Disable writer's own timestamp
ShowLevel(false). // Disable writer's own level prefix
@ -57,7 +60,8 @@ func NewConsoleSink(options map[string]any, appLogger *log.Logger, formatter for
}
s := &ConsoleSink{
input: make(chan core.LogEntry, bufferSize),
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
writer: writer,
done: make(chan struct{}),
startTime: time.Now(),
@ -156,8 +160,4 @@ func (s *ConsoleSink) processLoop(ctx context.Context) {
return
}
}
}
func (s *ConsoleSink) SetAuth(auth *config.AuthConfig) {
// Authentication does not apply to the console sink.
}

View File

@ -5,10 +5,10 @@ import (
"bytes"
"context"
"fmt"
"logwisp/src/internal/config"
"sync/atomic"
"time"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
@ -17,6 +17,7 @@ import (
// Writes log entries to files with rotation
type FileSink struct {
config *config.FileSinkOptions
input chan core.LogEntry
writer *log.Logger // Internal logger instance for file writing
done chan struct{}
@ -30,64 +31,27 @@ type FileSink struct {
}
// Creates a new file sink
func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*FileSink, error) {
directory, ok := options["directory"].(string)
if !ok || directory == "" {
directory = "./"
logger.Warn("No directory or invalid directory provided, current directory will be used")
}
name, ok := options["name"].(string)
if !ok || name == "" {
name = "logwisp.output"
logger.Warn(fmt.Sprintf("No filename provided, %s will be used", name))
func NewFileSink(opts *config.FileSinkOptions, logger *log.Logger, formatter format.Formatter) (*FileSink, error) {
if opts == nil {
return nil, fmt.Errorf("file sink options cannot be nil")
}
// Create configuration for the internal log writer
writerConfig := log.DefaultConfig()
writerConfig.Directory = directory
writerConfig.Name = name
writerConfig.Directory = opts.Directory
writerConfig.Name = opts.Name
writerConfig.EnableConsole = false // File only
writerConfig.ShowTimestamp = false // We already have timestamps in entries
writerConfig.ShowLevel = false // We already have levels in entries
// Add optional configurations
if maxSize, ok := options["max_size_mb"].(int64); ok && maxSize > 0 {
writerConfig.MaxSizeKB = maxSize * 1000
}
if maxTotalSize, ok := options["max_total_size_mb"].(int64); ok && maxTotalSize >= 0 {
writerConfig.MaxTotalSizeKB = maxTotalSize * 1000
}
if retention, ok := options["retention_hours"].(int64); ok && retention > 0 {
writerConfig.RetentionPeriodHrs = float64(retention)
}
if minDiskFree, ok := options["min_disk_free_mb"].(int64); ok && minDiskFree > 0 {
writerConfig.MinDiskFreeKB = minDiskFree * 1000
}
// Create internal logger for file writing
writer := log.NewLogger()
if err := writer.ApplyConfig(writerConfig); err != nil {
return nil, fmt.Errorf("failed to initialize file writer: %w", err)
}
// Start the internal file writer
if err := writer.Start(); err != nil {
return nil, fmt.Errorf("failed to start file writer: %w", err)
}
// Buffer size for input channel
// TODO: Centralized constant file in core package
bufferSize := int64(1000)
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
bufferSize = bufSize
}
fs := &FileSink{
input: make(chan core.LogEntry, bufferSize),
input: make(chan core.LogEntry, opts.BufferSize),
writer: writer,
done: make(chan struct{}),
startTime: time.Now(),
@ -104,6 +68,11 @@ func (fs *FileSink) Input() chan<- core.LogEntry {
}
func (fs *FileSink) Start(ctx context.Context) error {
// Start the internal file writer
if err := fs.writer.Start(); err != nil {
return fmt.Errorf("failed to start sink file writer: %w", err)
}
go fs.processLoop(ctx)
fs.logger.Info("msg", "File sink started", "component", "file_sink")
return nil
@ -166,8 +135,4 @@ func (fs *FileSink) processLoop(ctx context.Context) {
return
}
}
}
func (fs *FileSink) SetAuth(auth *config.AuthConfig) {
// Authentication does not apply to file sink
}

View File

@ -26,8 +26,11 @@ import (
// Streams log entries via Server-Sent Events
type HTTPSink struct {
// Configuration reference (NOT a copy)
config *config.HTTPSinkOptions
// Runtime
input chan core.LogEntry
config HTTPConfig
server *fasthttp.Server
activeClients atomic.Int64
mu sync.RWMutex
@ -46,11 +49,7 @@ type HTTPSink struct {
// Security components
authenticator *auth.Authenticator
tlsManager *tls.Manager
authConfig *config.AuthConfig
// Path configuration
streamPath string
statusPath string
authConfig *config.ServerAuthConfig
// Net limiting
netLimiter *limit.NetLimiter
@ -62,151 +61,58 @@ type HTTPSink struct {
authSuccesses atomic.Uint64
}
// Holds HTTP sink configuration
type HTTPConfig struct {
Host string
Port int64
BufferSize int64
StreamPath string
StatusPath string
Heartbeat *config.HeartbeatConfig
TLS *config.TLSConfig
NetLimit *config.NetLimitConfig
}
// Creates a new HTTP streaming sink
func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) {
cfg := HTTPConfig{
Host: "0.0.0.0",
Port: 8080,
BufferSize: 1000,
StreamPath: "/stream",
StatusPath: "/status",
}
// Extract configuration from options
if host, ok := options["host"].(string); ok && host != "" {
cfg.Host = host
}
if port, ok := options["port"].(int64); ok {
cfg.Port = port
}
if bufSize, ok := options["buffer_size"].(int64); ok {
cfg.BufferSize = bufSize
}
if path, ok := options["stream_path"].(string); ok {
cfg.StreamPath = path
}
if path, ok := options["status_path"].(string); ok {
cfg.StatusPath = path
}
// Extract heartbeat config
if hb, ok := options["heartbeat"].(map[string]any); ok {
cfg.Heartbeat = &config.HeartbeatConfig{}
cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool)
if interval, ok := hb["interval_seconds"].(int64); ok {
cfg.Heartbeat.IntervalSeconds = interval
}
cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool)
cfg.Heartbeat.IncludeStats, _ = hb["include_stats"].(bool)
if hbFormat, ok := hb["format"].(string); ok {
cfg.Heartbeat.Format = hbFormat
}
}
// Extract TLS config
if tc, ok := options["tls"].(map[string]any); ok {
cfg.TLS = &config.TLSConfig{}
cfg.TLS.Enabled, _ = tc["enabled"].(bool)
if certFile, ok := tc["cert_file"].(string); ok {
cfg.TLS.CertFile = certFile
}
if keyFile, ok := tc["key_file"].(string); ok {
cfg.TLS.KeyFile = keyFile
}
cfg.TLS.ClientAuth, _ = tc["client_auth"].(bool)
if caFile, ok := tc["client_ca_file"].(string); ok {
cfg.TLS.ClientCAFile = caFile
}
cfg.TLS.VerifyClientCert, _ = tc["verify_client_cert"].(bool)
if minVer, ok := tc["min_version"].(string); ok {
cfg.TLS.MinVersion = minVer
}
if maxVer, ok := tc["max_version"].(string); ok {
cfg.TLS.MaxVersion = maxVer
}
if ciphers, ok := tc["cipher_suites"].(string); ok {
cfg.TLS.CipherSuites = ciphers
}
}
// Extract net limit config
if nl, ok := options["net_limit"].(map[string]any); ok {
cfg.NetLimit = &config.NetLimitConfig{}
cfg.NetLimit.Enabled, _ = nl["enabled"].(bool)
if rps, ok := nl["requests_per_second"].(float64); ok {
cfg.NetLimit.RequestsPerSecond = rps
}
if burst, ok := nl["burst_size"].(int64); ok {
cfg.NetLimit.BurstSize = burst
}
if respCode, ok := nl["response_code"].(int64); ok {
cfg.NetLimit.ResponseCode = respCode
}
if msg, ok := nl["response_message"].(string); ok {
cfg.NetLimit.ResponseMessage = msg
}
if maxPerIP, ok := nl["max_connections_per_ip"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerIP = maxPerIP
}
if maxTotal, ok := nl["max_connections_total"].(int64); ok {
cfg.NetLimit.MaxConnectionsTotal = maxTotal
}
if ipWhitelist, ok := nl["ip_whitelist"].([]any); ok {
cfg.NetLimit.IPWhitelist = make([]string, 0, len(ipWhitelist))
for _, entry := range ipWhitelist {
if str, ok := entry.(string); ok {
cfg.NetLimit.IPWhitelist = append(cfg.NetLimit.IPWhitelist, str)
}
}
}
if ipBlacklist, ok := nl["ip_blacklist"].([]any); ok {
cfg.NetLimit.IPBlacklist = make([]string, 0, len(ipBlacklist))
for _, entry := range ipBlacklist {
if str, ok := entry.(string); ok {
cfg.NetLimit.IPBlacklist = append(cfg.NetLimit.IPBlacklist, str)
}
}
}
func NewHTTPSink(opts *config.HTTPSinkOptions, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) {
if opts == nil {
return nil, fmt.Errorf("HTTP sink options cannot be nil")
}
h := &HTTPSink{
input: make(chan core.LogEntry, cfg.BufferSize),
config: cfg,
startTime: time.Now(),
done: make(chan struct{}),
streamPath: cfg.StreamPath,
statusPath: cfg.StatusPath,
logger: logger,
formatter: formatter,
clients: make(map[uint64]chan core.LogEntry),
unregister: make(chan uint64, 10), // Buffered for non-blocking
config: opts, // Direct reference to config struct
input: make(chan core.LogEntry, opts.BufferSize),
startTime: time.Now(),
done: make(chan struct{}),
logger: logger,
formatter: formatter,
clients: make(map[uint64]chan core.LogEntry),
}
h.lastProcessed.Store(time.Time{})
// Initialize TLS manager
if cfg.TLS != nil && cfg.TLS.Enabled {
tlsManager, err := tls.NewManager(cfg.TLS, logger)
// Initialize TLS manager if configured
if opts.TLS != nil && opts.TLS.Enabled {
tlsManager, err := tls.NewManager(opts.TLS, logger)
if err != nil {
return nil, fmt.Errorf("failed to create TLS manager: %w", err)
}
h.tlsManager = tlsManager
logger.Info("msg", "TLS enabled",
"component", "http_sink")
}
// Initialize net limiter if configured
if cfg.NetLimit != nil && cfg.NetLimit.Enabled {
h.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger)
if opts.NetLimit != nil && (opts.NetLimit.Enabled ||
len(opts.NetLimit.IPWhitelist) > 0 ||
len(opts.NetLimit.IPBlacklist) > 0) {
h.netLimiter = limit.NewNetLimiter(opts.NetLimit, logger)
}
// Initialize authenticator if auth is not "none"
if opts.Auth != nil && opts.Auth.Type != "none" {
// Only "basic" and "token" are valid for HTTP sink
if opts.Auth.Type != "basic" && opts.Auth.Type != "token" {
return nil, fmt.Errorf("invalid auth type '%s' for HTTP sink (valid: none, basic, token)", opts.Auth.Type)
}
authenticator, err := auth.NewAuthenticator(opts.Auth, logger)
if err != nil {
return nil, fmt.Errorf("failed to create authenticator: %w", err)
}
h.authenticator = authenticator
h.authConfig = opts.Auth
logger.Info("msg", "Authentication enabled",
"component", "http_sink",
"type", opts.Auth.Type)
}
return h, nil
@ -230,6 +136,9 @@ func (h *HTTPSink) Start(ctx context.Context) error {
DisableKeepalive: false,
StreamRequestBody: true,
Logger: fasthttpLogger,
// ReadTimeout: time.Duration(h.config.ReadTimeout) * time.Millisecond,
WriteTimeout: time.Duration(h.config.WriteTimeout) * time.Millisecond,
// MaxRequestBodySize: int(h.config.MaxBodySize),
}
// Configure TLS if enabled
@ -250,8 +159,8 @@ func (h *HTTPSink) Start(ctx context.Context) error {
"component", "http_sink",
"host", h.config.Host,
"port", h.config.Port,
"stream_path", h.streamPath,
"status_path", h.statusPath,
"stream_path", h.config.StreamPath,
"status_path", h.config.StatusPath,
"tls_enabled", h.tlsManager != nil)
var err error
@ -296,7 +205,7 @@ func (h *HTTPSink) brokerLoop(ctx context.Context) {
var tickerChan <-chan time.Time
if h.config.Heartbeat != nil && h.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second)
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.Interval) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
@ -441,8 +350,8 @@ func (h *HTTPSink) GetStats() SinkStats {
"port": h.config.Port,
"buffer_size": h.config.BufferSize,
"endpoints": map[string]string{
"stream": h.streamPath,
"status": h.statusPath,
"stream": h.config.StreamPath,
"status": h.config.StatusPath,
},
"net_limit": netLimitStats,
"auth": authStats,
@ -489,7 +398,7 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
// Status endpoint doesn't require auth
if path == h.statusPath {
if path == h.config.StatusPath {
h.handleStatus(ctx)
return
}
@ -509,14 +418,14 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
// Return 401 with WWW-Authenticate header
ctx.SetStatusCode(fasthttp.StatusUnauthorized)
if h.authConfig.Type == "basic" && h.authConfig.BasicAuth != nil {
realm := h.authConfig.BasicAuth.Realm
if h.authConfig.Type == "basic" && h.authConfig.Basic != nil {
realm := h.authConfig.Basic.Realm
if realm == "" {
realm = "Restricted"
}
ctx.Response.Header.Set("WWW-Authenticate", fmt.Sprintf("Basic realm=\"%s\"", realm))
} else if h.authConfig.Type == "bearer" {
ctx.Response.Header.Set("WWW-Authenticate", "Bearer")
} else if h.authConfig.Type == "token" {
ctx.Response.Header.Set("WWW-Authenticate", "Token")
}
ctx.SetContentType("application/json")
@ -538,7 +447,7 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
}
switch path {
case h.streamPath:
case h.config.StreamPath:
h.handleStream(ctx, session)
default:
ctx.SetStatusCode(fasthttp.StatusNotFound)
@ -547,6 +456,15 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
"error": "Not Found",
})
}
// Handle stream endpoint
// if path == h.config.StreamPath {
// h.handleStream(ctx, session)
// return
// }
//
// // Unknown path
// ctx.SetStatusCode(fasthttp.StatusNotFound)
// ctx.SetBody([]byte("Not Found"))
}
func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session) {
@ -611,8 +529,8 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
"client_id": fmt.Sprintf("%d", clientID),
"username": session.Username,
"auth_method": session.Method,
"stream_path": h.streamPath,
"status_path": h.statusPath,
"stream_path": h.config.StreamPath,
"status_path": h.config.StatusPath,
"buffer_size": h.config.BufferSize,
"tls": h.tlsManager != nil,
}
@ -627,7 +545,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
var tickerChan <-chan time.Time
if h.config.Heartbeat != nil && h.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second)
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.Interval) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
@ -716,7 +634,7 @@ func (h *HTTPSink) createHeartbeatEntry() core.LogEntry {
fields := make(map[string]any)
fields["type"] = "heartbeat"
if h.config.Heartbeat.IncludeStats {
if h.config.Heartbeat.Enabled {
fields["active_clients"] = h.activeClients.Load()
fields["uptime_seconds"] = int(time.Since(h.startTime).Seconds())
}
@ -775,13 +693,13 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
"uptime_seconds": int(time.Since(h.startTime).Seconds()),
},
"endpoints": map[string]string{
"transport": h.streamPath,
"status": h.statusPath,
"transport": h.config.StreamPath,
"status": h.config.StatusPath,
},
"features": map[string]any{
"heartbeat": map[string]any{
"enabled": h.config.Heartbeat.Enabled,
"interval": h.config.Heartbeat.IntervalSeconds,
"interval": h.config.Heartbeat.Interval,
"format": h.config.Heartbeat.Format,
},
"tls": tlsStats,
@ -806,37 +724,15 @@ func (h *HTTPSink) GetActiveConnections() int64 {
// Returns the configured transport endpoint path
func (h *HTTPSink) GetStreamPath() string {
return h.streamPath
return h.config.StreamPath
}
// Returns the configured status endpoint path
func (h *HTTPSink) GetStatusPath() string {
return h.statusPath
return h.config.StatusPath
}
// Returns the configured host
func (h *HTTPSink) GetHost() string {
return h.config.Host
}
// Configures http sink auth
func (h *HTTPSink) SetAuth(authCfg *config.AuthConfig) {
if authCfg == nil || authCfg.Type == "none" {
return
}
h.authConfig = authCfg
authenticator, err := auth.NewAuthenticator(authCfg, h.logger)
if err != nil {
h.logger.Error("msg", "Failed to initialize authenticator for HTTP sink",
"component", "http_sink",
"error", err)
// Continue without auth
return
}
h.authenticator = authenticator
h.logger.Info("msg", "Authentication configured for HTTP sink",
"component", "http_sink",
"auth_type", authCfg.Type)
}

View File

@ -8,7 +8,6 @@ import (
"crypto/x509"
"encoding/base64"
"fmt"
"net/url"
"os"
"strings"
"sync"
@ -28,7 +27,7 @@ import (
// Forwards log entries to a remote HTTP endpoint
type HTTPClientSink struct {
input chan core.LogEntry
config HTTPClientConfig
config *config.HTTPClientSinkOptions
client *fasthttp.Client
batch []core.LogEntry
batchMu sync.Mutex
@ -48,195 +47,16 @@ type HTTPClientSink struct {
activeConnections atomic.Int64
}
// Holds HTTP client sink configuration
// TODO: missing toml tags
type HTTPClientConfig struct {
// Config
URL string `toml:"url"`
BufferSize int64 `toml:"buffer_size"`
BatchSize int64 `toml:"batch_size"`
BatchDelay time.Duration `toml:"batch_delay_ms"`
Timeout time.Duration `toml:"timeout_seconds"`
Headers map[string]string `toml:"headers"`
// Retry configuration
MaxRetries int64 `toml:"max_retries"`
RetryDelay time.Duration `toml:"retry_delay"`
RetryBackoff float64 `toml:"retry_backoff"` // Multiplier for exponential backoff
// Security
AuthType string `toml:"auth_type"` // "none", "basic", "bearer", "mtls"
Username string `toml:"username"` // For basic auth
Password string `toml:"password"` // For basic auth
BearerToken string `toml:"bearer_token"` // For bearer auth
// TLS configuration
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
CAFile string `toml:"ca_file"`
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
}
// Creates a new HTTP client sink
func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) {
cfg := HTTPClientConfig{
BufferSize: int64(1000),
BatchSize: int64(100),
BatchDelay: time.Second,
Timeout: 30 * time.Second,
MaxRetries: int64(3),
RetryDelay: time.Second,
RetryBackoff: float64(2.0),
Headers: make(map[string]string),
}
// Extract URL
urlStr, ok := options["url"].(string)
if !ok || urlStr == "" {
return nil, fmt.Errorf("http_client sink requires 'url' option")
}
// Validate URL
parsedURL, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return nil, fmt.Errorf("URL must use http or https scheme")
}
cfg.URL = urlStr
// Extract other options
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
cfg.BufferSize = bufSize
}
if batchSize, ok := options["batch_size"].(int64); ok && batchSize > 0 {
cfg.BatchSize = batchSize
}
if delayMs, ok := options["batch_delay_ms"].(int64); ok && delayMs > 0 {
cfg.BatchDelay = time.Duration(delayMs) * time.Millisecond
}
if timeoutSec, ok := options["timeout_seconds"].(int64); ok && timeoutSec > 0 {
cfg.Timeout = time.Duration(timeoutSec) * time.Second
}
if maxRetries, ok := options["max_retries"].(int64); ok && maxRetries >= 0 {
cfg.MaxRetries = maxRetries
}
if retryDelayMs, ok := options["retry_delay_ms"].(int64); ok && retryDelayMs > 0 {
cfg.RetryDelay = time.Duration(retryDelayMs) * time.Millisecond
}
if backoff, ok := options["retry_backoff"].(float64); ok && backoff >= 1.0 {
cfg.RetryBackoff = backoff
}
if insecure, ok := options["insecure_skip_verify"].(bool); ok {
cfg.InsecureSkipVerify = insecure
}
if authType, ok := options["auth_type"].(string); ok {
switch authType {
case "none", "basic", "bearer", "mtls":
cfg.AuthType = authType
default:
return nil, fmt.Errorf("http_client sink: invalid auth_type '%s'", authType)
}
} else {
cfg.AuthType = "none"
}
if username, ok := options["username"].(string); ok {
cfg.Username = username
}
if password, ok := options["password"].(string); ok {
cfg.Password = password // TODO: change to Argon2 hashed password
}
if token, ok := options["bearer_token"].(string); ok {
cfg.BearerToken = token
}
// Validate auth configuration and TLS enforcement
isHTTPS := strings.HasPrefix(cfg.URL, "https://")
switch cfg.AuthType {
case "basic":
if cfg.Username == "" || cfg.Password == "" {
return nil, fmt.Errorf("http_client sink: username and password required for basic auth")
}
if !isHTTPS {
return nil, fmt.Errorf("http_client sink: basic auth requires HTTPS (security: credentials would be sent in plaintext)")
}
case "bearer":
if cfg.BearerToken == "" {
return nil, fmt.Errorf("http_client sink: bearer_token required for bearer auth")
}
if !isHTTPS {
return nil, fmt.Errorf("http_client sink: bearer auth requires HTTPS (security: token would be sent in plaintext)")
}
case "mtls":
if !isHTTPS {
return nil, fmt.Errorf("http_client sink: mTLS requires HTTPS")
}
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, fmt.Errorf("http_client sink: cert_file and key_file required for mTLS")
}
case "none":
// Clear any credentials if auth is "none"
if cfg.Username != "" || cfg.Password != "" || cfg.BearerToken != "" {
logger.Warn("msg", "Credentials provided but auth_type is 'none', ignoring",
"component", "http_client_sink")
cfg.Username = ""
cfg.Password = ""
cfg.BearerToken = ""
}
}
// Extract headers
if headers, ok := options["headers"].(map[string]any); ok {
for k, v := range headers {
if strVal, ok := v.(string); ok {
cfg.Headers[k] = strVal
}
}
}
// Set default Content-Type if not specified
if _, exists := cfg.Headers["Content-Type"]; !exists {
cfg.Headers["Content-Type"] = "application/json"
}
// Extract TLS options
if caFile, ok := options["ca_file"].(string); ok && caFile != "" {
cfg.CAFile = caFile
}
// Extract client certificate options from TLS config
if tc, ok := options["tls"].(map[string]any); ok {
if enabled, _ := tc["enabled"].(bool); enabled {
// Extract client certificate files for mTLS
if certFile, ok := tc["cert_file"].(string); ok && certFile != "" {
if keyFile, ok := tc["key_file"].(string); ok && keyFile != "" {
// These will be used below when configuring TLS
cfg.CertFile = certFile // Need to add these fields to HTTPClientConfig
cfg.KeyFile = keyFile
}
}
// Extract CA file from TLS config if not already set
if cfg.CAFile == "" {
if caFile, ok := tc["ca_file"].(string); ok {
cfg.CAFile = caFile
}
}
// Extract insecure skip verify from TLS config
if insecure, ok := tc["insecure_skip_verify"].(bool); ok {
cfg.InsecureSkipVerify = insecure
}
}
func NewHTTPClientSink(opts *config.HTTPClientSinkOptions, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) {
if opts == nil {
return nil, fmt.Errorf("HTTP client sink options cannot be nil")
}
h := &HTTPClientSink{
input: make(chan core.LogEntry, cfg.BufferSize),
config: cfg,
batch: make([]core.LogEntry, 0, cfg.BatchSize),
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
batch: make([]core.LogEntry, 0, opts.BatchSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
@ -249,46 +69,48 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for
h.client = &fasthttp.Client{
MaxConnsPerHost: 10,
MaxIdleConnDuration: 10 * time.Second,
ReadTimeout: cfg.Timeout,
WriteTimeout: cfg.Timeout,
ReadTimeout: time.Duration(opts.Timeout) * time.Second,
WriteTimeout: time.Duration(opts.Timeout) * time.Second,
DisableHeaderNamesNormalizing: true,
}
// Configure TLS if using HTTPS
if strings.HasPrefix(cfg.URL, "https://") {
if strings.HasPrefix(opts.URL, "https://") {
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify,
InsecureSkipVerify: opts.InsecureSkipVerify,
}
// Load custom CA for server verification if provided
if cfg.CAFile != "" {
caCert, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file '%s': %w", cfg.CAFile, err)
// Use TLS config if provided
if opts.TLS != nil {
// Load custom CA for server verification
if opts.TLS.CAFile != "" {
caCert, err := os.ReadFile(opts.TLS.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file '%s': %w", opts.TLS.CAFile, err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate from '%s'", opts.TLS.CAFile)
}
tlsConfig.RootCAs = caCertPool
logger.Debug("msg", "Custom CA loaded for server verification",
"component", "http_client_sink",
"ca_file", opts.TLS.CAFile)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate from '%s'", cfg.CAFile)
// Load client certificate for mTLS if provided
if opts.TLS.CertFile != "" && opts.TLS.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(opts.TLS.CertFile, opts.TLS.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
logger.Info("msg", "Client certificate loaded for mTLS",
"component", "http_client_sink",
"cert_file", opts.TLS.CertFile)
}
tlsConfig.RootCAs = caCertPool
logger.Debug("msg", "Custom CA loaded for server verification",
"component", "http_client_sink",
"ca_file", cfg.CAFile)
}
// Load client certificate for mTLS if provided
if cfg.CertFile != "" && cfg.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
logger.Info("msg", "Client certificate loaded for mTLS",
"component", "http_client_sink",
"cert_file", cfg.CertFile)
}
// Set TLS config directly on the client
h.client.TLSConfig = tlsConfig
}
@ -308,7 +130,7 @@ func (h *HTTPClientSink) Start(ctx context.Context) error {
"component", "http_client_sink",
"url", h.config.URL,
"batch_size", h.config.BatchSize,
"batch_delay", h.config.BatchDelay)
"batch_delay_ms", h.config.BatchDelayMS)
return nil
}
@ -399,7 +221,7 @@ func (h *HTTPClientSink) processLoop(ctx context.Context) {
func (h *HTTPClientSink) batchTimer(ctx context.Context) {
defer h.wg.Done()
ticker := time.NewTicker(h.config.BatchDelay)
ticker := time.NewTicker(time.Duration(h.config.BatchDelayMS) * time.Millisecond)
defer ticker.Stop()
for {
@ -468,7 +290,7 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
// Retry logic
var lastErr error
retryDelay := h.config.RetryDelay
retryDelay := time.Duration(h.config.RetryDelayMS) * time.Millisecond
// TODO: verify retry loop placement is correct or should it be after acquiring resources (req :=....)
for attempt := int64(0); attempt <= h.config.MaxRetries; attempt++ {
@ -480,9 +302,10 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
newDelay := time.Duration(float64(retryDelay) * h.config.RetryBackoff)
// Cap at maximum to prevent integer overflow
if newDelay > h.config.Timeout || newDelay < retryDelay {
timeout := time.Duration(h.config.Timeout) * time.Second
if newDelay > timeout || newDelay < retryDelay {
// Either exceeded max or overflowed (negative/wrapped)
retryDelay = h.config.Timeout
retryDelay = timeout
} else {
retryDelay = newDelay
}
@ -500,14 +323,14 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
req.Header.Set("User-Agent", fmt.Sprintf("LogWisp/%s", version.Short()))
// Add authentication based on auth type
switch h.config.AuthType {
switch h.config.Auth.Type {
case "basic":
creds := h.config.Username + ":" + h.config.Password
creds := h.config.Auth.Username + ":" + h.config.Auth.Password
encodedCreds := base64.StdEncoding.EncodeToString([]byte(creds))
req.Header.Set("Authorization", "Basic "+encodedCreds)
case "bearer":
req.Header.Set("Authorization", "Bearer "+h.config.BearerToken)
case "token":
req.Header.Set("Authorization", "Token "+h.config.Auth.Token)
case "mtls":
// mTLS auth is handled at TLS layer via client certificates
@ -523,7 +346,7 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
}
// Send request
err := h.client.DoTimeout(req, resp, h.config.Timeout)
err := h.client.DoTimeout(req, resp, time.Duration(h.config.Timeout)*time.Second)
// Capture response before releasing
statusCode := resp.StatusCode()
@ -587,10 +410,4 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
"retries", h.config.MaxRetries,
"last_error", lastErr)
h.failedBatches.Add(1)
}
// Not applicable, Clients authenticate to remote servers using Username/Password in config
func (h *HTTPClientSink) SetAuth(authCfg *config.AuthConfig) {
// No-op: client sinks don't validate incoming connections
// They authenticate to remote servers using Username/Password fields
}

View File

@ -5,7 +5,6 @@ import (
"context"
"time"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
)
@ -22,9 +21,6 @@ type Sink interface {
// Returns sink statistics
GetStats() SinkStats
// Configure authentication
SetAuth(auth *config.AuthConfig)
}
// Contains statistics about a sink

View File

@ -7,7 +7,6 @@ import (
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
@ -25,26 +24,22 @@ import (
// Streams log entries via TCP
type TCPSink struct {
// C
input chan core.LogEntry
config TCPConfig
server *tcpServer
done chan struct{}
activeConns atomic.Int64
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
netLimiter *limit.NetLimiter
logger *log.Logger
formatter format.Formatter
authenticator *auth.Authenticator
input chan core.LogEntry
config *config.TCPSinkOptions
server *tcpServer
done chan struct{}
activeConns atomic.Int64
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
netLimiter *limit.NetLimiter
logger *log.Logger
formatter format.Formatter
// Statistics
totalProcessed atomic.Uint64
lastProcessed atomic.Value // time.Time
authFailures atomic.Uint64
authSuccesses atomic.Uint64
// Write error tracking
writeErrors atomic.Uint64
@ -62,87 +57,14 @@ type TCPConfig struct {
}
// Creates a new TCP streaming sink
func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) {
cfg := TCPConfig{
Host: "0.0.0.0",
Port: int64(9090),
BufferSize: int64(1000),
}
// Extract configuration from options
if host, ok := options["host"].(string); ok && host != "" {
cfg.Host = host
}
if port, ok := options["port"].(int64); ok {
cfg.Port = port
}
if bufSize, ok := options["buffer_size"].(int64); ok {
cfg.BufferSize = bufSize
}
// Extract heartbeat config
if hb, ok := options["heartbeat"].(map[string]any); ok {
cfg.Heartbeat = &config.HeartbeatConfig{}
cfg.Heartbeat.Enabled, _ = hb["enabled"].(bool)
if interval, ok := hb["interval_seconds"].(int64); ok {
cfg.Heartbeat.IntervalSeconds = interval
}
cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool)
cfg.Heartbeat.IncludeStats, _ = hb["include_stats"].(bool)
if hbFormat, ok := hb["format"].(string); ok {
cfg.Heartbeat.Format = hbFormat
}
}
// Extract net limit config
if nl, ok := options["net_limit"].(map[string]any); ok {
cfg.NetLimit = &config.NetLimitConfig{}
cfg.NetLimit.Enabled, _ = nl["enabled"].(bool)
if rps, ok := nl["requests_per_second"].(float64); ok {
cfg.NetLimit.RequestsPerSecond = rps
}
if burst, ok := nl["burst_size"].(int64); ok {
cfg.NetLimit.BurstSize = burst
}
if respCode, ok := nl["response_code"].(int64); ok {
cfg.NetLimit.ResponseCode = respCode
}
if msg, ok := nl["response_message"].(string); ok {
cfg.NetLimit.ResponseMessage = msg
}
if maxPerIP, ok := nl["max_connections_per_ip"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerIP = maxPerIP
}
if maxPerUser, ok := nl["max_connections_per_user"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerUser = maxPerUser
}
if maxPerToken, ok := nl["max_connections_per_token"].(int64); ok {
cfg.NetLimit.MaxConnectionsPerToken = maxPerToken
}
if maxTotal, ok := nl["max_connections_total"].(int64); ok {
cfg.NetLimit.MaxConnectionsTotal = maxTotal
}
if ipWhitelist, ok := nl["ip_whitelist"].([]any); ok {
cfg.NetLimit.IPWhitelist = make([]string, 0, len(ipWhitelist))
for _, entry := range ipWhitelist {
if str, ok := entry.(string); ok {
cfg.NetLimit.IPWhitelist = append(cfg.NetLimit.IPWhitelist, str)
}
}
}
if ipBlacklist, ok := nl["ip_blacklist"].([]any); ok {
cfg.NetLimit.IPBlacklist = make([]string, 0, len(ipBlacklist))
for _, entry := range ipBlacklist {
if str, ok := entry.(string); ok {
cfg.NetLimit.IPBlacklist = append(cfg.NetLimit.IPBlacklist, str)
}
}
}
func NewTCPSink(opts *config.TCPSinkOptions, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) {
if opts == nil {
return nil, fmt.Errorf("TCP sink options cannot be nil")
}
t := &TCPSink{
input: make(chan core.LogEntry, cfg.BufferSize),
config: cfg,
config: opts, // Direct reference to config
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
@ -150,9 +72,11 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For
}
t.lastProcessed.Store(time.Time{})
// Initialize net limiter
if cfg.NetLimit != nil && cfg.NetLimit.Enabled {
t.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger)
// Initialize net limiter with pointer
if opts.NetLimit != nil && (opts.NetLimit.Enabled ||
len(opts.NetLimit.IPWhitelist) > 0 ||
len(opts.NetLimit.IPBlacklist) > 0) {
t.netLimiter = limit.NewNetLimiter(opts.NetLimit, logger)
}
return t, nil
@ -193,8 +117,7 @@ func (t *TCPSink) Start(ctx context.Context) error {
go func() {
t.logger.Info("msg", "Starting TCP server",
"component", "tcp_sink",
"port", t.config.Port,
"auth", t.authenticator != nil)
"port", t.config.Port)
err := gnet.Run(t.server, addr, opts...)
if err != nil {
@ -282,7 +205,7 @@ func (t *TCPSink) broadcastLoop(ctx context.Context) {
var tickerChan <-chan time.Time
if t.config.Heartbeat != nil && t.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(t.config.Heartbeat.IntervalSeconds) * time.Second)
ticker = time.NewTicker(time.Duration(t.config.Heartbeat.Interval) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
@ -329,21 +252,19 @@ func (t *TCPSink) broadcastData(data []byte) {
t.server.mu.RLock()
defer t.server.mu.RUnlock()
for conn, client := range t.server.clients {
if client.authenticated {
conn.AsyncWrite(data, func(c gnet.Conn, err error) error {
if err != nil {
t.writeErrors.Add(1)
t.handleWriteError(c, err)
} else {
// Reset consecutive error count on success
t.errorMu.Lock()
delete(t.consecutiveWriteErrors, c)
t.errorMu.Unlock()
}
return nil
})
}
for conn, _ := range t.server.clients {
conn.AsyncWrite(data, func(c gnet.Conn, err error) error {
if err != nil {
t.writeErrors.Add(1)
t.handleWriteError(c, err)
} else {
// Reset consecutive error count on success
t.errorMu.Lock()
delete(t.consecutiveWriteErrors, c)
t.errorMu.Unlock()
}
return nil
})
}
}
@ -408,11 +329,10 @@ func (t *TCPSink) GetActiveConnections() int64 {
// Represents a connected TCP client with auth state
type tcpClient struct {
conn gnet.Conn
buffer bytes.Buffer
authenticated bool
authTimeout time.Time
session *auth.Session
conn gnet.Conn
buffer bytes.Buffer
authTimeout time.Time
session *auth.Session
}
// Handles gnet events with authentication
@ -439,7 +359,7 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
remoteAddr := c.RemoteAddr()
s.sink.logger.Debug("msg", "TCP connection attempt", "remote_addr", remoteAddr)
// Reject IPv6 connections immediately
// Reject IPv6 connections
if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok {
if tcpAddr.IP.To4() == nil {
return []byte("IPv4-only (IPv6 not supported)\n"), gnet.Close
@ -467,14 +387,10 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
s.sink.netLimiter.AddConnection(remoteStr)
}
// Create client state without auth timeout initially
// TCP Sink accepts all connections without authentication
client := &tcpClient{
conn: c,
authenticated: s.sink.authenticator == nil,
}
if s.sink.authenticator != nil {
client.authTimeout = time.Now().Add(30 * time.Second)
conn: c,
buffer: bytes.Buffer{},
}
s.mu.Lock()
@ -484,13 +400,7 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
newCount := s.sink.activeConns.Add(1)
s.sink.logger.Debug("msg", "TCP connection opened",
"remote_addr", remoteAddr,
"active_connections", newCount,
"auth_enabled", s.sink.authenticator != nil)
// Send auth prompt if authentication is required
if s.sink.authenticator != nil {
return []byte("AUTH_REQUIRED\n"), gnet.None
}
"active_connections", newCount)
return nil, gnet.None
}
@ -522,96 +432,7 @@ func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
}
func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action {
s.mu.RLock()
client, exists := s.clients[c]
s.mu.RUnlock()
if !exists {
return gnet.Close
}
// Authentication phase
if !client.authenticated {
// Check auth timeout
if time.Now().After(client.authTimeout) {
s.sink.logger.Warn("msg", "Authentication timeout",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr().String())
return gnet.Close
}
// Read auth data
data, _ := c.Next(-1)
if len(data) == 0 {
return gnet.None
}
client.buffer.Write(data)
// Look for complete auth line
if idx := bytes.IndexByte(client.buffer.Bytes(), '\n'); idx >= 0 {
line := client.buffer.Bytes()[:idx]
client.buffer.Next(idx + 1)
// Parse AUTH command: AUTH <method> <credentials>
parts := strings.SplitN(string(line), " ", 3)
if len(parts) != 3 || parts[0] != "AUTH" {
c.AsyncWrite([]byte("AUTH_FAIL\n"), nil)
return gnet.Close
}
// Authenticate
session, err := s.sink.authenticator.AuthenticateTCP(parts[1], parts[2], c.RemoteAddr().String())
if err != nil {
s.sink.authFailures.Add(1)
s.sink.logger.Warn("msg", "TCP authentication failed",
"remote_addr", c.RemoteAddr().String(),
"method", parts[1],
"error", err)
c.AsyncWrite([]byte("AUTH_FAIL\n"), nil)
return gnet.Close
}
// Authentication successful
s.sink.authSuccesses.Add(1)
s.mu.Lock()
client.authenticated = true
client.session = session
s.mu.Unlock()
s.sink.logger.Info("msg", "TCP client authenticated",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr().String(),
"username", session.Username,
"method", session.Method)
c.AsyncWrite([]byte("AUTH_OK\n"), nil)
client.buffer.Reset()
}
return gnet.None
}
// Clients shouldn't send data, just discard
// TCP Sink doesn't expect any data from clients, discard all
c.Discard(-1)
return gnet.None
}
// Configures tcp sink auth
func (t *TCPSink) SetAuth(authCfg *config.AuthConfig) {
if authCfg == nil || authCfg.Type == "none" {
return
}
authenticator, err := auth.NewAuthenticator(authCfg, t.logger)
if err != nil {
t.logger.Error("msg", "Failed to initialize authenticator for TCP sink",
"component", "tcp_sink",
"error", err)
return
}
t.authenticator = authenticator
t.logger.Info("msg", "Authentication configured for TCP sink",
"component", "tcp_sink",
"auth_type", authCfg.Type)
}

View File

@ -7,7 +7,9 @@ import (
"encoding/json"
"errors"
"fmt"
"logwisp/src/internal/auth"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -16,7 +18,6 @@ import (
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/scram"
"github.com/lixenwraith/log"
)
@ -24,7 +25,8 @@ import (
// Forwards log entries to a remote TCP endpoint
type TCPClientSink struct {
input chan core.LogEntry
config TCPClientConfig
config *config.TCPClientSinkOptions
address string
conn net.Conn
connMu sync.RWMutex
done chan struct{}
@ -46,101 +48,17 @@ type TCPClientSink struct {
connectionUptime atomic.Value // time.Duration
}
// Holds TCP client sink configuration
type TCPClientConfig struct {
Address string `toml:"address"`
BufferSize int64 `toml:"buffer_size"`
DialTimeout time.Duration `toml:"dial_timeout_seconds"`
WriteTimeout time.Duration `toml:"write_timeout_seconds"`
ReadTimeout time.Duration `toml:"read_timeout_seconds"`
KeepAlive time.Duration `toml:"keep_alive_seconds"`
// Security
AuthType string `toml:"auth_type"`
Username string `toml:"username"`
Password string `toml:"password"`
// Reconnection settings
ReconnectDelay time.Duration `toml:"reconnect_delay_ms"`
MaxReconnectDelay time.Duration `toml:"max_reconnect_delay_seconds"`
ReconnectBackoff float64 `toml:"reconnect_backoff"`
}
// Creates a new TCP client sink
func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) {
cfg := TCPClientConfig{
BufferSize: int64(1000),
DialTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
ReadTimeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
ReconnectDelay: time.Second,
MaxReconnectDelay: 30 * time.Second,
ReconnectBackoff: float64(1.5),
}
// Extract address
address, ok := options["address"].(string)
if !ok || address == "" {
return nil, fmt.Errorf("tcp_client sink requires 'address' option")
}
// Validate address format
_, _, err := net.SplitHostPort(address)
if err != nil {
return nil, fmt.Errorf("invalid address format (expected host:port): %w", err)
}
cfg.Address = address
// Extract other options
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
cfg.BufferSize = bufSize
}
if dialTimeout, ok := options["dial_timeout_seconds"].(int64); ok && dialTimeout > 0 {
cfg.DialTimeout = time.Duration(dialTimeout) * time.Second
}
if writeTimeout, ok := options["write_timeout_seconds"].(int64); ok && writeTimeout > 0 {
cfg.WriteTimeout = time.Duration(writeTimeout) * time.Second
}
if readTimeout, ok := options["read_timeout_seconds"].(int64); ok && readTimeout > 0 {
cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
}
if keepAlive, ok := options["keep_alive_seconds"].(int64); ok && keepAlive > 0 {
cfg.KeepAlive = time.Duration(keepAlive) * time.Second
}
if reconnectDelay, ok := options["reconnect_delay_ms"].(int64); ok && reconnectDelay > 0 {
cfg.ReconnectDelay = time.Duration(reconnectDelay) * time.Millisecond
}
if maxReconnectDelay, ok := options["max_reconnect_delay_seconds"].(int64); ok && maxReconnectDelay > 0 {
cfg.MaxReconnectDelay = time.Duration(maxReconnectDelay) * time.Second
}
if backoff, ok := options["reconnect_backoff"].(float64); ok && backoff >= 1.0 {
cfg.ReconnectBackoff = backoff
}
if authType, ok := options["auth_type"].(string); ok {
switch authType {
case "none":
cfg.AuthType = authType
case "scram":
cfg.AuthType = authType
if username, ok := options["username"].(string); ok && username != "" {
cfg.Username = username
} else {
return nil, fmt.Errorf("invalid scram username")
}
if password, ok := options["password"].(string); ok && password != "" {
cfg.Password = password
} else {
return nil, fmt.Errorf("invalid scram password")
}
default:
return nil, fmt.Errorf("tcp_client sink: invalid auth_type '%s' (must be 'none' or 'scram')", authType)
}
func NewTCPClientSink(opts *config.TCPClientSinkOptions, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) {
// Validation and defaults are handled in config package
if opts == nil {
return nil, fmt.Errorf("TCP client sink options cannot be nil")
}
t := &TCPClientSink{
input: make(chan core.LogEntry, cfg.BufferSize),
config: cfg,
config: opts,
address: opts.Host + ":" + strconv.Itoa(int(opts.Port)),
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
@ -167,7 +85,8 @@ func (t *TCPClientSink) Start(ctx context.Context) error {
t.logger.Info("msg", "TCP client sink started",
"component", "tcp_client_sink",
"address", t.config.Address)
"host", t.config.Host,
"port", t.config.Port)
return nil
}
@ -209,7 +128,7 @@ func (t *TCPClientSink) GetStats() SinkStats {
StartTime: t.startTime,
LastProcessed: lastProc,
Details: map[string]any{
"address": t.config.Address,
"address": t.address,
"connected": connected,
"reconnecting": t.reconnecting.Load(),
"total_failed": t.totalFailed.Load(),
@ -223,7 +142,7 @@ func (t *TCPClientSink) GetStats() SinkStats {
func (t *TCPClientSink) connectionManager(ctx context.Context) {
defer t.wg.Done()
reconnectDelay := t.config.ReconnectDelay
reconnectDelay := time.Duration(t.config.ReconnectDelayMS) * time.Millisecond
for {
select {
@ -243,9 +162,9 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.lastConnectErr = err
t.logger.Warn("msg", "Failed to connect to TCP server",
"component", "tcp_client_sink",
"address", t.config.Address,
"address", t.address,
"error", err,
"retry_delay", reconnectDelay)
"retry_delay_ms", reconnectDelay)
// Wait before retry
select {
@ -258,15 +177,15 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
// Exponential backoff
reconnectDelay = time.Duration(float64(reconnectDelay) * t.config.ReconnectBackoff)
if reconnectDelay > t.config.MaxReconnectDelay {
reconnectDelay = t.config.MaxReconnectDelay
if reconnectDelay > time.Duration(t.config.MaxReconnectDelayMS)*time.Millisecond {
reconnectDelay = time.Duration(t.config.MaxReconnectDelayMS)
}
continue
}
// Connection successful
t.lastConnectErr = nil
reconnectDelay = t.config.ReconnectDelay // Reset backoff
reconnectDelay = time.Duration(t.config.ReconnectDelayMS) * time.Millisecond // Reset backoff
t.connectTime = time.Now()
t.totalReconnects.Add(1)
@ -276,7 +195,7 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.logger.Info("msg", "Connected to TCP server",
"component", "tcp_client_sink",
"address", t.config.Address,
"address", t.address,
"local_addr", conn.LocalAddr())
// Monitor connection
@ -293,18 +212,18 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.logger.Warn("msg", "Lost connection to TCP server",
"component", "tcp_client_sink",
"address", t.config.Address,
"address", t.address,
"uptime", uptime)
}
}
func (t *TCPClientSink) connect() (net.Conn, error) {
dialer := &net.Dialer{
Timeout: t.config.DialTimeout,
KeepAlive: t.config.KeepAlive,
Timeout: time.Duration(t.config.DialTimeout) * time.Second,
KeepAlive: time.Duration(t.config.KeepAlive) * time.Second,
}
conn, err := dialer.Dial("tcp", t.config.Address)
conn, err := dialer.Dial("tcp", t.address)
if err != nil {
return nil, err
}
@ -312,18 +231,18 @@ func (t *TCPClientSink) connect() (net.Conn, error) {
// Set TCP keep-alive
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(t.config.KeepAlive)
tcpConn.SetKeepAlivePeriod(time.Duration(t.config.KeepAlive) * time.Second)
}
// SCRAM authentication if credentials configured
if t.config.AuthType == "scram" {
if t.config.Auth != nil && t.config.Auth.Type == "scram" {
if err := t.performSCRAMAuth(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("SCRAM authentication failed: %w", err)
}
t.logger.Debug("msg", "SCRAM authentication completed",
"component", "tcp_client_sink",
"address", t.config.Address)
"address", t.address)
}
return conn, nil
@ -333,7 +252,17 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
reader := bufio.NewReader(conn)
// Create SCRAM client
scramClient := scram.NewClient(t.config.Username, t.config.Password)
scramClient := auth.NewScramClient(t.config.Auth.Username, t.config.Auth.Password)
// Wait for AUTH_REQUIRED from server
authPrompt, err := reader.ReadString('\n')
if err != nil {
return fmt.Errorf("failed to read auth prompt: %w", err)
}
if strings.TrimSpace(authPrompt) != "AUTH_REQUIRED" {
return fmt.Errorf("unexpected server greeting: %s", authPrompt)
}
// Step 1: Send ClientFirst
clientFirst, err := scramClient.StartAuthentication()
@ -341,8 +270,10 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
return fmt.Errorf("failed to start SCRAM: %w", err)
}
clientFirstJSON, _ := json.Marshal(clientFirst)
msg := fmt.Sprintf("SCRAM-FIRST %s\n", clientFirstJSON)
msg, err := auth.FormatSCRAMRequest("SCRAM-FIRST", clientFirst)
if err != nil {
return err
}
if _, err := conn.Write([]byte(msg)); err != nil {
return fmt.Errorf("failed to send SCRAM-FIRST: %w", err)
@ -354,13 +285,17 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
return fmt.Errorf("failed to read SCRAM challenge: %w", err)
}
parts := strings.Fields(strings.TrimSpace(response))
if len(parts) != 2 || parts[0] != "SCRAM-CHALLENGE" {
return fmt.Errorf("unexpected server response: %s", response)
command, data, err := auth.ParseSCRAMResponse(response)
if err != nil {
return err
}
var serverFirst scram.ServerFirst
if err := json.Unmarshal([]byte(parts[1]), &serverFirst); err != nil {
if command != "SCRAM-CHALLENGE" {
return fmt.Errorf("unexpected server response: %s", command)
}
var serverFirst auth.ServerFirst
if err := json.Unmarshal([]byte(data), &serverFirst); err != nil {
return fmt.Errorf("failed to parse server challenge: %w", err)
}
@ -370,8 +305,10 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
return fmt.Errorf("failed to process challenge: %w", err)
}
clientFinalJSON, _ := json.Marshal(clientFinal)
msg = fmt.Sprintf("SCRAM-PROOF %s\n", clientFinalJSON)
msg, err = auth.FormatSCRAMRequest("SCRAM-PROOF", clientFinal)
if err != nil {
return err
}
if _, err := conn.Write([]byte(msg)); err != nil {
return fmt.Errorf("failed to send SCRAM-PROOF: %w", err)
@ -383,19 +320,15 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
return fmt.Errorf("failed to read SCRAM result: %w", err)
}
parts = strings.Fields(strings.TrimSpace(response))
if len(parts) < 1 {
return fmt.Errorf("empty server response")
command, data, err = auth.ParseSCRAMResponse(response)
if err != nil {
return err
}
switch parts[0] {
switch command {
case "SCRAM-OK":
if len(parts) != 2 {
return fmt.Errorf("invalid SCRAM-OK response")
}
var serverFinal scram.ServerFinal
if err := json.Unmarshal([]byte(parts[1]), &serverFinal); err != nil {
var serverFinal auth.ServerFinal
if err := json.Unmarshal([]byte(data), &serverFinal); err != nil {
return fmt.Errorf("failed to parse server signature: %w", err)
}
@ -406,21 +339,21 @@ func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
t.logger.Info("msg", "SCRAM authentication successful",
"component", "tcp_client_sink",
"address", t.config.Address,
"username", t.config.Username,
"address", t.address,
"username", t.config.Auth.Username,
"session_id", serverFinal.SessionID)
return nil
case "SCRAM-FAIL":
reason := "unknown"
if len(parts) > 1 {
reason = strings.Join(parts[1:], " ")
reason := data
if reason == "" {
reason = "unknown"
}
return fmt.Errorf("authentication failed: %s", reason)
default:
return fmt.Errorf("unexpected response: %s", response)
return fmt.Errorf("unexpected response: %s", command)
}
}
@ -436,7 +369,7 @@ func (t *TCPClientSink) monitorConnection(conn net.Conn) {
return
case <-ticker.C:
// Set read deadline
if err := conn.SetReadDeadline(time.Now().Add(t.config.ReadTimeout)); err != nil {
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(t.config.ReadTimeout) * time.Second)); err != nil {
t.logger.Debug("msg", "Failed to set read deadline", "error", err)
return
}
@ -502,7 +435,7 @@ func (t *TCPClientSink) sendEntry(entry core.LogEntry) error {
}
// Set write deadline
if err := conn.SetWriteDeadline(time.Now().Add(t.config.WriteTimeout)); err != nil {
if err := conn.SetWriteDeadline(time.Now().Add(time.Duration(t.config.WriteTimeout) * time.Second)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
@ -518,10 +451,4 @@ func (t *TCPClientSink) sendEntry(entry core.LogEntry) error {
}
return nil
}
// Not applicable, Clients authenticate to remote servers using Username/Password in config
func (h *TCPClientSink) SetAuth(authCfg *config.AuthConfig) {
// No-op: client sinks don't validate incoming connections
// They authenticate to remote servers using Username/Password fields
}