v0.6.0 auth restructuring, scram auth added, more tests added
This commit is contained in:
@ -2,9 +2,9 @@
|
||||
package sink
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -16,20 +16,13 @@ import (
|
||||
"github.com/lixenwraith/log"
|
||||
)
|
||||
|
||||
// Holds common configuration for console sinks
|
||||
type ConsoleConfig struct {
|
||||
Target string // "stdout", "stderr", or "split"
|
||||
BufferSize int64
|
||||
}
|
||||
|
||||
// Writes log entries to stdout
|
||||
type StdoutSink struct {
|
||||
// ConsoleSink writes log entries to the console (stdout/stderr) using an dedicated logger instance
|
||||
type ConsoleSink struct {
|
||||
input chan core.LogEntry
|
||||
config ConsoleConfig
|
||||
output io.Writer
|
||||
writer *log.Logger // Dedicated internal logger instance for console writing
|
||||
done chan struct{}
|
||||
startTime time.Time
|
||||
logger *log.Logger
|
||||
logger *log.Logger // Application logger for app logs
|
||||
formatter format.Formatter
|
||||
|
||||
// Statistics
|
||||
@ -37,29 +30,38 @@ type StdoutSink struct {
|
||||
lastProcessed atomic.Value // time.Time
|
||||
}
|
||||
|
||||
// Creates a new stdout sink
|
||||
func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*StdoutSink, error) {
|
||||
config := ConsoleConfig{
|
||||
Target: "stdout",
|
||||
BufferSize: 1000,
|
||||
// Creates a new console sink
|
||||
func NewConsoleSink(options map[string]any, appLogger *log.Logger, formatter format.Formatter) (*ConsoleSink, error) {
|
||||
target := "stdout"
|
||||
if t, ok := options["target"].(string); ok {
|
||||
target = t
|
||||
}
|
||||
|
||||
// Check for split mode configuration
|
||||
if target, ok := options["target"].(string); ok {
|
||||
config.Target = target
|
||||
bufferSize := int64(1000)
|
||||
if buf, ok := options["buffer_size"].(int64); ok && buf > 0 {
|
||||
bufferSize = buf
|
||||
}
|
||||
|
||||
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
|
||||
config.BufferSize = bufSize
|
||||
// Dedicated logger instance as console writer
|
||||
writer, err := log.NewBuilder().
|
||||
EnableFile(false).
|
||||
EnableConsole(true).
|
||||
ConsoleTarget(target).
|
||||
Format("raw"). // Passthrough pre-formatted messages
|
||||
ShowTimestamp(false). // Disable writer's own timestamp
|
||||
ShowLevel(false). // Disable writer's own level prefix
|
||||
Build()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create console writer: %w", err)
|
||||
}
|
||||
|
||||
s := &StdoutSink{
|
||||
input: make(chan core.LogEntry, config.BufferSize),
|
||||
config: config,
|
||||
output: os.Stdout,
|
||||
s := &ConsoleSink{
|
||||
input: make(chan core.LogEntry, bufferSize),
|
||||
writer: writer,
|
||||
done: make(chan struct{}),
|
||||
startTime: time.Now(),
|
||||
logger: logger,
|
||||
logger: appLogger,
|
||||
formatter: formatter,
|
||||
}
|
||||
s.lastProcessed.Store(time.Time{})
|
||||
@ -67,39 +69,52 @@ func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format.
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Input() chan<- core.LogEntry {
|
||||
func (s *ConsoleSink) Input() chan<- core.LogEntry {
|
||||
return s.input
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Start(ctx context.Context) error {
|
||||
func (s *ConsoleSink) Start(ctx context.Context) error {
|
||||
// Start the internal writer's processing goroutine.
|
||||
if err := s.writer.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start console writer: %w", err)
|
||||
}
|
||||
go s.processLoop(ctx)
|
||||
s.logger.Info("msg", "Stdout sink started",
|
||||
"component", "stdout_sink",
|
||||
"target", s.config.Target)
|
||||
s.logger.Info("msg", "Console sink started",
|
||||
"component", "console_sink",
|
||||
"target", s.writer.GetConfig().ConsoleTarget)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Stop() {
|
||||
s.logger.Info("msg", "Stopping stdout sink")
|
||||
func (s *ConsoleSink) Stop() {
|
||||
target := s.writer.GetConfig().ConsoleTarget
|
||||
s.logger.Info("msg", "Stopping console sink", "target", target)
|
||||
close(s.done)
|
||||
s.logger.Info("msg", "Stdout sink stopped")
|
||||
|
||||
// Shutdown the internal writer with a timeout.
|
||||
if err := s.writer.Shutdown(2 * time.Second); err != nil {
|
||||
s.logger.Error("msg", "Error shutting down console writer",
|
||||
"component", "console_sink",
|
||||
"error", err)
|
||||
}
|
||||
s.logger.Info("msg", "Console sink stopped", "target", target)
|
||||
}
|
||||
|
||||
func (s *StdoutSink) GetStats() SinkStats {
|
||||
func (s *ConsoleSink) GetStats() SinkStats {
|
||||
lastProc, _ := s.lastProcessed.Load().(time.Time)
|
||||
|
||||
return SinkStats{
|
||||
Type: "stdout",
|
||||
Type: "console",
|
||||
TotalProcessed: s.totalProcessed.Load(),
|
||||
StartTime: s.startTime,
|
||||
LastProcessed: lastProc,
|
||||
Details: map[string]any{
|
||||
"target": s.config.Target,
|
||||
"target": s.writer.GetConfig().ConsoleTarget,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StdoutSink) processLoop(ctx context.Context) {
|
||||
// processLoop reads entries, formats them, and passes them to the internal writer.
|
||||
func (s *ConsoleSink) processLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case entry, ok := <-s.input:
|
||||
@ -110,24 +125,30 @@ func (s *StdoutSink) processLoop(ctx context.Context) {
|
||||
s.totalProcessed.Add(1)
|
||||
s.lastProcessed.Store(time.Now())
|
||||
|
||||
// Handle split mode - only process INFO/DEBUG for stdout
|
||||
if s.config.Target == "split" {
|
||||
upperLevel := strings.ToUpper(entry.Level)
|
||||
if upperLevel == "ERROR" || upperLevel == "WARN" || upperLevel == "WARNING" {
|
||||
// Skip ERROR/WARN levels in stdout when in split mode
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Format and write
|
||||
// Format the entry using the pipeline's configured formatter.
|
||||
formatted, err := s.formatter.Format(entry)
|
||||
if err != nil {
|
||||
s.logger.Error("msg", "Failed to format log entry for stdout",
|
||||
"component", "stdout_sink",
|
||||
s.logger.Error("msg", "Failed to format log entry for console",
|
||||
"component", "console_sink",
|
||||
"error", err)
|
||||
continue
|
||||
}
|
||||
s.output.Write(formatted)
|
||||
|
||||
// Convert to string to prevent hex encoding of []byte by log package
|
||||
// Strip new line, writer adds it
|
||||
message := string(bytes.TrimSuffix(formatted, []byte{'\n'}))
|
||||
switch strings.ToUpper(entry.Level) {
|
||||
case "DEBUG":
|
||||
s.writer.Debug(message)
|
||||
case "INFO":
|
||||
s.writer.Info(message)
|
||||
case "WARN", "WARNING":
|
||||
s.writer.Warn(message)
|
||||
case "ERROR", "FATAL":
|
||||
s.writer.Error(message)
|
||||
default:
|
||||
s.writer.Message(message)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@ -137,125 +158,6 @@ func (s *StdoutSink) processLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// Writes log entries to stderr
|
||||
type StderrSink struct {
|
||||
input chan core.LogEntry
|
||||
config ConsoleConfig
|
||||
output io.Writer
|
||||
done chan struct{}
|
||||
startTime time.Time
|
||||
logger *log.Logger
|
||||
formatter format.Formatter
|
||||
|
||||
// Statistics
|
||||
totalProcessed atomic.Uint64
|
||||
lastProcessed atomic.Value // time.Time
|
||||
}
|
||||
|
||||
// Creates a new stderr sink
|
||||
func NewStderrSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*StderrSink, error) {
|
||||
config := ConsoleConfig{
|
||||
Target: "stderr",
|
||||
BufferSize: 1000,
|
||||
}
|
||||
|
||||
// Check for split mode configuration
|
||||
if target, ok := options["target"].(string); ok {
|
||||
config.Target = target
|
||||
}
|
||||
|
||||
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
|
||||
config.BufferSize = bufSize
|
||||
}
|
||||
|
||||
s := &StderrSink{
|
||||
input: make(chan core.LogEntry, config.BufferSize),
|
||||
config: config,
|
||||
output: os.Stderr,
|
||||
done: make(chan struct{}),
|
||||
startTime: time.Now(),
|
||||
logger: logger,
|
||||
formatter: formatter,
|
||||
}
|
||||
s.lastProcessed.Store(time.Time{})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *StderrSink) Input() chan<- core.LogEntry {
|
||||
return s.input
|
||||
}
|
||||
|
||||
func (s *StderrSink) Start(ctx context.Context) error {
|
||||
go s.processLoop(ctx)
|
||||
s.logger.Info("msg", "Stderr sink started",
|
||||
"component", "stderr_sink",
|
||||
"target", s.config.Target)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StderrSink) Stop() {
|
||||
s.logger.Info("msg", "Stopping stderr sink")
|
||||
close(s.done)
|
||||
s.logger.Info("msg", "Stderr sink stopped")
|
||||
}
|
||||
|
||||
func (s *StderrSink) GetStats() SinkStats {
|
||||
lastProc, _ := s.lastProcessed.Load().(time.Time)
|
||||
|
||||
return SinkStats{
|
||||
Type: "stderr",
|
||||
TotalProcessed: s.totalProcessed.Load(),
|
||||
StartTime: s.startTime,
|
||||
LastProcessed: lastProc,
|
||||
Details: map[string]any{
|
||||
"target": s.config.Target,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StderrSink) processLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case entry, ok := <-s.input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
s.totalProcessed.Add(1)
|
||||
s.lastProcessed.Store(time.Now())
|
||||
|
||||
// Handle split mode - only process ERROR/WARN for stderr
|
||||
if s.config.Target == "split" {
|
||||
upperLevel := strings.ToUpper(entry.Level)
|
||||
if upperLevel != "ERROR" && upperLevel != "WARN" && upperLevel != "WARNING" {
|
||||
// Skip non-ERROR/WARN levels in stderr when in split mode
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Format and write
|
||||
formatted, err := s.formatter.Format(entry)
|
||||
if err != nil {
|
||||
s.logger.Error("msg", "Failed to format log entry for stderr",
|
||||
"component", "stderr_sink",
|
||||
"error", err)
|
||||
continue
|
||||
}
|
||||
s.output.Write(formatted)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StdoutSink) SetAuth(auth *config.AuthConfig) {
|
||||
// Authentication does not apply to stdout sink
|
||||
}
|
||||
|
||||
func (s *StderrSink) SetAuth(auth *config.AuthConfig) {
|
||||
// Authentication does not apply to stderr sink
|
||||
func (s *ConsoleSink) SetAuth(auth *config.AuthConfig) {
|
||||
// Authentication does not apply to the console sink.
|
||||
}
|
||||
@ -2,6 +2,7 @@
|
||||
package sink
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"logwisp/src/internal/config"
|
||||
@ -32,12 +33,14 @@ type FileSink struct {
|
||||
func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*FileSink, error) {
|
||||
directory, ok := options["directory"].(string)
|
||||
if !ok || directory == "" {
|
||||
return nil, fmt.Errorf("file sink requires 'directory' option")
|
||||
directory = "./"
|
||||
logger.Warn("No directory or invalid directory provided, current directory will be used")
|
||||
}
|
||||
|
||||
name, ok := options["name"].(string)
|
||||
if !ok || name == "" {
|
||||
return nil, fmt.Errorf("file sink requires 'name' option")
|
||||
name = "logwisp.output"
|
||||
logger.Warn(fmt.Sprintf("No filename provided, %s will be used", name))
|
||||
}
|
||||
|
||||
// Create configuration for the internal log writer
|
||||
@ -77,7 +80,7 @@ func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Fo
|
||||
}
|
||||
|
||||
// Buffer size for input channel
|
||||
// TODO: Make this configurable
|
||||
// TODO: Centralized constant file in core package
|
||||
bufferSize := int64(1000)
|
||||
if bufSize, ok := options["buffer_size"].(int64); ok && bufSize > 0 {
|
||||
bufferSize = bufSize
|
||||
@ -152,11 +155,9 @@ func (fs *FileSink) processLoop(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Write formatted bytes (strip newline as writer adds it)
|
||||
message := string(formatted)
|
||||
if len(message) > 0 && message[len(message)-1] == '\n' {
|
||||
message = message[:len(message)-1]
|
||||
}
|
||||
// Convert to string to prevent hex encoding of []byte by log package
|
||||
// Strip new line, writer adds it
|
||||
message := string(bytes.TrimSuffix(formatted, []byte{'\n'}))
|
||||
fs.writer.Message(message)
|
||||
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -471,6 +471,21 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce TLS for authentication
|
||||
if h.authenticator != nil && h.authConfig.Type != "none" {
|
||||
isTLS := ctx.IsTLS() || h.tlsManager != nil
|
||||
|
||||
if !isTLS {
|
||||
ctx.SetStatusCode(fasthttp.StatusForbidden)
|
||||
ctx.SetContentType("application/json")
|
||||
json.NewEncoder(ctx).Encode(map[string]string{
|
||||
"error": "TLS required for authentication",
|
||||
"hint": "Use HTTPS for authenticated connections",
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
path := string(ctx.Path())
|
||||
|
||||
// Status endpoint doesn't require auth
|
||||
@ -811,7 +826,7 @@ func (h *HTTPSink) SetAuth(authCfg *config.AuthConfig) {
|
||||
}
|
||||
|
||||
h.authConfig = authCfg
|
||||
authenticator, err := auth.New(authCfg, h.logger)
|
||||
authenticator, err := auth.NewAuthenticator(authCfg, h.logger)
|
||||
if err != nil {
|
||||
h.logger.Error("msg", "Failed to initialize authenticator for HTTP sink",
|
||||
"component", "http_sink",
|
||||
|
||||
@ -52,27 +52,29 @@ type HTTPClientSink struct {
|
||||
// TODO: missing toml tags
|
||||
type HTTPClientConfig struct {
|
||||
// Config
|
||||
URL string
|
||||
BufferSize int64
|
||||
BatchSize int64
|
||||
BatchDelay time.Duration
|
||||
Timeout time.Duration
|
||||
Headers map[string]string
|
||||
URL string `toml:"url"`
|
||||
BufferSize int64 `toml:"buffer_size"`
|
||||
BatchSize int64 `toml:"batch_size"`
|
||||
BatchDelay time.Duration `toml:"batch_delay_ms"`
|
||||
Timeout time.Duration `toml:"timeout_seconds"`
|
||||
Headers map[string]string `toml:"headers"`
|
||||
|
||||
// Retry configuration
|
||||
MaxRetries int64
|
||||
RetryDelay time.Duration
|
||||
RetryBackoff float64 // Multiplier for exponential backoff
|
||||
MaxRetries int64 `toml:"max_retries"`
|
||||
RetryDelay time.Duration `toml:"retry_delay"`
|
||||
RetryBackoff float64 `toml:"retry_backoff"` // Multiplier for exponential backoff
|
||||
|
||||
// Security
|
||||
Username string
|
||||
Password string
|
||||
AuthType string `toml:"auth_type"` // "none", "basic", "bearer", "mtls"
|
||||
Username string `toml:"username"` // For basic auth
|
||||
Password string `toml:"password"` // For basic auth
|
||||
BearerToken string `toml:"bearer_token"` // For bearer auth
|
||||
|
||||
// TLS configuration
|
||||
InsecureSkipVerify bool
|
||||
CAFile string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
|
||||
CAFile string `toml:"ca_file"`
|
||||
CertFile string `toml:"cert_file"`
|
||||
KeyFile string `toml:"key_file"`
|
||||
}
|
||||
|
||||
// Creates a new HTTP client sink
|
||||
@ -129,12 +131,64 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for
|
||||
if insecure, ok := options["insecure_skip_verify"].(bool); ok {
|
||||
cfg.InsecureSkipVerify = insecure
|
||||
}
|
||||
if authType, ok := options["auth_type"].(string); ok {
|
||||
switch authType {
|
||||
case "none", "basic", "bearer", "mtls":
|
||||
cfg.AuthType = authType
|
||||
default:
|
||||
return nil, fmt.Errorf("http_client sink: invalid auth_type '%s'", authType)
|
||||
}
|
||||
} else {
|
||||
cfg.AuthType = "none"
|
||||
}
|
||||
if username, ok := options["username"].(string); ok {
|
||||
cfg.Username = username
|
||||
}
|
||||
if password, ok := options["password"].(string); ok {
|
||||
cfg.Password = password // TODO: change to Argon2 hashed password
|
||||
}
|
||||
if token, ok := options["bearer_token"].(string); ok {
|
||||
cfg.BearerToken = token
|
||||
}
|
||||
|
||||
// Validate auth configuration and TLS enforcement
|
||||
isHTTPS := strings.HasPrefix(cfg.URL, "https://")
|
||||
|
||||
switch cfg.AuthType {
|
||||
case "basic":
|
||||
if cfg.Username == "" || cfg.Password == "" {
|
||||
return nil, fmt.Errorf("http_client sink: username and password required for basic auth")
|
||||
}
|
||||
if !isHTTPS {
|
||||
return nil, fmt.Errorf("http_client sink: basic auth requires HTTPS (security: credentials would be sent in plaintext)")
|
||||
}
|
||||
|
||||
case "bearer":
|
||||
if cfg.BearerToken == "" {
|
||||
return nil, fmt.Errorf("http_client sink: bearer_token required for bearer auth")
|
||||
}
|
||||
if !isHTTPS {
|
||||
return nil, fmt.Errorf("http_client sink: bearer auth requires HTTPS (security: token would be sent in plaintext)")
|
||||
}
|
||||
|
||||
case "mtls":
|
||||
if !isHTTPS {
|
||||
return nil, fmt.Errorf("http_client sink: mTLS requires HTTPS")
|
||||
}
|
||||
if cfg.CertFile == "" || cfg.KeyFile == "" {
|
||||
return nil, fmt.Errorf("http_client sink: cert_file and key_file required for mTLS")
|
||||
}
|
||||
|
||||
case "none":
|
||||
// Clear any credentials if auth is "none"
|
||||
if cfg.Username != "" || cfg.Password != "" || cfg.BearerToken != "" {
|
||||
logger.Warn("msg", "Credentials provided but auth_type is 'none', ignoring",
|
||||
"component", "http_client_sink")
|
||||
cfg.Username = ""
|
||||
cfg.Password = ""
|
||||
cfg.BearerToken = ""
|
||||
}
|
||||
}
|
||||
|
||||
// Extract headers
|
||||
if headers, ok := options["headers"].(map[string]any); ok {
|
||||
@ -416,6 +470,7 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
|
||||
var lastErr error
|
||||
retryDelay := h.config.RetryDelay
|
||||
|
||||
// TODO: verify retry loop placement is correct or should it be after acquiring resources (req :=....)
|
||||
for attempt := int64(0); attempt <= h.config.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
// Wait before retry
|
||||
@ -444,11 +499,22 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
|
||||
|
||||
req.Header.Set("User-Agent", fmt.Sprintf("LogWisp/%s", version.Short()))
|
||||
|
||||
// Add Basic Auth header if credentials configured
|
||||
if h.config.Username != "" && h.config.Password != "" {
|
||||
// Add authentication based on auth type
|
||||
switch h.config.AuthType {
|
||||
case "basic":
|
||||
creds := h.config.Username + ":" + h.config.Password
|
||||
encodedCreds := base64.StdEncoding.EncodeToString([]byte(creds))
|
||||
req.Header.Set("Authorization", "Basic "+encodedCreds)
|
||||
|
||||
case "bearer":
|
||||
req.Header.Set("Authorization", "Bearer "+h.config.BearerToken)
|
||||
|
||||
case "mtls":
|
||||
// mTLS auth is handled at TLS layer via client certificates
|
||||
// No Authorization header needed
|
||||
|
||||
case "none":
|
||||
// No authentication
|
||||
}
|
||||
|
||||
// Set headers
|
||||
|
||||
@ -602,7 +602,7 @@ func (t *TCPSink) SetAuth(authCfg *config.AuthConfig) {
|
||||
return
|
||||
}
|
||||
|
||||
authenticator, err := auth.New(authCfg, t.logger)
|
||||
authenticator, err := auth.NewAuthenticator(authCfg, t.logger)
|
||||
if err != nil {
|
||||
t.logger.Error("msg", "Failed to initialize authenticator for TCP sink",
|
||||
"component", "tcp_sink",
|
||||
|
||||
@ -4,7 +4,7 @@ package sink
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -13,26 +13,25 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"logwisp/src/internal/auth"
|
||||
"logwisp/src/internal/config"
|
||||
"logwisp/src/internal/core"
|
||||
"logwisp/src/internal/format"
|
||||
"logwisp/src/internal/scram"
|
||||
|
||||
"github.com/lixenwraith/log"
|
||||
)
|
||||
|
||||
// Forwards log entries to a remote TCP endpoint
|
||||
type TCPClientSink struct {
|
||||
input chan core.LogEntry
|
||||
config TCPClientConfig
|
||||
conn net.Conn
|
||||
connMu sync.RWMutex
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
startTime time.Time
|
||||
logger *log.Logger
|
||||
formatter format.Formatter
|
||||
authenticator *auth.Authenticator
|
||||
input chan core.LogEntry
|
||||
config TCPClientConfig
|
||||
conn net.Conn
|
||||
connMu sync.RWMutex
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
startTime time.Time
|
||||
logger *log.Logger
|
||||
formatter format.Formatter
|
||||
|
||||
// Reconnection state
|
||||
reconnecting atomic.Bool
|
||||
@ -49,24 +48,22 @@ type TCPClientSink struct {
|
||||
|
||||
// Holds TCP client sink configuration
|
||||
type TCPClientConfig struct {
|
||||
Address string
|
||||
BufferSize int64
|
||||
DialTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ReadTimeout time.Duration
|
||||
KeepAlive time.Duration
|
||||
Address string `toml:"address"`
|
||||
BufferSize int64 `toml:"buffer_size"`
|
||||
DialTimeout time.Duration `toml:"dial_timeout_seconds"`
|
||||
WriteTimeout time.Duration `toml:"write_timeout_seconds"`
|
||||
ReadTimeout time.Duration `toml:"read_timeout_seconds"`
|
||||
KeepAlive time.Duration `toml:"keep_alive_seconds"`
|
||||
|
||||
// Security
|
||||
Username string
|
||||
Password string
|
||||
AuthType string `toml:"auth_type"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
|
||||
// Reconnection settings
|
||||
ReconnectDelay time.Duration
|
||||
MaxReconnectDelay time.Duration
|
||||
ReconnectBackoff float64
|
||||
|
||||
// TLS config
|
||||
TLS *config.TLSConfig
|
||||
ReconnectDelay time.Duration `toml:"reconnect_delay_ms"`
|
||||
MaxReconnectDelay time.Duration `toml:"max_reconnect_delay_seconds"`
|
||||
ReconnectBackoff float64 `toml:"reconnect_backoff"`
|
||||
}
|
||||
|
||||
// Creates a new TCP client sink
|
||||
@ -120,11 +117,25 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
|
||||
if backoff, ok := options["reconnect_backoff"].(float64); ok && backoff >= 1.0 {
|
||||
cfg.ReconnectBackoff = backoff
|
||||
}
|
||||
if username, ok := options["username"].(string); ok {
|
||||
cfg.Username = username
|
||||
}
|
||||
if password, ok := options["password"].(string); ok {
|
||||
cfg.Password = password
|
||||
if authType, ok := options["auth_type"].(string); ok {
|
||||
switch authType {
|
||||
case "none":
|
||||
cfg.AuthType = authType
|
||||
case "scram":
|
||||
cfg.AuthType = authType
|
||||
if username, ok := options["username"].(string); ok && username != "" {
|
||||
cfg.Username = username
|
||||
} else {
|
||||
return nil, fmt.Errorf("invalid scram username")
|
||||
}
|
||||
if password, ok := options["password"].(string); ok && password != "" {
|
||||
cfg.Password = password
|
||||
} else {
|
||||
return nil, fmt.Errorf("invalid scram password")
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("tcp_client sink: invalid auth_type '%s' (must be 'none' or 'scram')", authType)
|
||||
}
|
||||
}
|
||||
|
||||
t := &TCPClientSink{
|
||||
@ -304,49 +315,115 @@ func (t *TCPClientSink) connect() (net.Conn, error) {
|
||||
tcpConn.SetKeepAlivePeriod(t.config.KeepAlive)
|
||||
}
|
||||
|
||||
// Handle authentication if credentials configured
|
||||
if t.config.Username != "" && t.config.Password != "" {
|
||||
// Read auth challenge
|
||||
reader := bufio.NewReader(conn)
|
||||
challenge, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
// SCRAM authentication if credentials configured
|
||||
if t.config.AuthType == "scram" {
|
||||
if err := t.performSCRAMAuth(conn); err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to read auth challenge: %w", err)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(challenge) == "AUTH_REQUIRED" {
|
||||
// Send credentials
|
||||
creds := t.config.Username + ":" + t.config.Password
|
||||
encodedCreds := base64.StdEncoding.EncodeToString([]byte(creds))
|
||||
authCmd := fmt.Sprintf("AUTH basic %s\n", encodedCreds)
|
||||
|
||||
if _, err := conn.Write([]byte(authCmd)); err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to send auth: %w", err)
|
||||
}
|
||||
|
||||
// Read response
|
||||
response, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to read auth response: %w", err)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(response) != "AUTH_OK" {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("authentication failed: %s", response)
|
||||
}
|
||||
|
||||
t.logger.Debug("msg", "TCP authentication successful",
|
||||
"component", "tcp_client_sink",
|
||||
"address", t.config.Address,
|
||||
"username", t.config.Username)
|
||||
return nil, fmt.Errorf("SCRAM authentication failed: %w", err)
|
||||
}
|
||||
t.logger.Debug("msg", "SCRAM authentication completed",
|
||||
"component", "tcp_client_sink",
|
||||
"address", t.config.Address)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
|
||||
reader := bufio.NewReader(conn)
|
||||
|
||||
// Create SCRAM client
|
||||
scramClient := scram.NewClient(t.config.Username, t.config.Password)
|
||||
|
||||
// Step 1: Send ClientFirst
|
||||
clientFirst, err := scramClient.StartAuthentication()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start SCRAM: %w", err)
|
||||
}
|
||||
|
||||
clientFirstJSON, _ := json.Marshal(clientFirst)
|
||||
msg := fmt.Sprintf("SCRAM-FIRST %s\n", clientFirstJSON)
|
||||
|
||||
if _, err := conn.Write([]byte(msg)); err != nil {
|
||||
return fmt.Errorf("failed to send SCRAM-FIRST: %w", err)
|
||||
}
|
||||
|
||||
// Step 2: Receive ServerFirst challenge
|
||||
response, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read SCRAM challenge: %w", err)
|
||||
}
|
||||
|
||||
parts := strings.Fields(strings.TrimSpace(response))
|
||||
if len(parts) != 2 || parts[0] != "SCRAM-CHALLENGE" {
|
||||
return fmt.Errorf("unexpected server response: %s", response)
|
||||
}
|
||||
|
||||
var serverFirst scram.ServerFirst
|
||||
if err := json.Unmarshal([]byte(parts[1]), &serverFirst); err != nil {
|
||||
return fmt.Errorf("failed to parse server challenge: %w", err)
|
||||
}
|
||||
|
||||
// Step 3: Process challenge and send proof
|
||||
clientFinal, err := scramClient.ProcessServerFirst(&serverFirst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process challenge: %w", err)
|
||||
}
|
||||
|
||||
clientFinalJSON, _ := json.Marshal(clientFinal)
|
||||
msg = fmt.Sprintf("SCRAM-PROOF %s\n", clientFinalJSON)
|
||||
|
||||
if _, err := conn.Write([]byte(msg)); err != nil {
|
||||
return fmt.Errorf("failed to send SCRAM-PROOF: %w", err)
|
||||
}
|
||||
|
||||
// Step 4: Receive ServerFinal
|
||||
response, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read SCRAM result: %w", err)
|
||||
}
|
||||
|
||||
parts = strings.Fields(strings.TrimSpace(response))
|
||||
if len(parts) < 1 {
|
||||
return fmt.Errorf("empty server response")
|
||||
}
|
||||
|
||||
switch parts[0] {
|
||||
case "SCRAM-OK":
|
||||
if len(parts) != 2 {
|
||||
return fmt.Errorf("invalid SCRAM-OK response")
|
||||
}
|
||||
|
||||
var serverFinal scram.ServerFinal
|
||||
if err := json.Unmarshal([]byte(parts[1]), &serverFinal); err != nil {
|
||||
return fmt.Errorf("failed to parse server signature: %w", err)
|
||||
}
|
||||
|
||||
// Verify server signature
|
||||
if err := scramClient.VerifyServerFinal(&serverFinal); err != nil {
|
||||
return fmt.Errorf("server signature verification failed: %w", err)
|
||||
}
|
||||
|
||||
t.logger.Info("msg", "SCRAM authentication successful",
|
||||
"component", "tcp_client_sink",
|
||||
"address", t.config.Address,
|
||||
"username", t.config.Username,
|
||||
"session_id", serverFinal.SessionID)
|
||||
|
||||
return nil
|
||||
|
||||
case "SCRAM-FAIL":
|
||||
reason := "unknown"
|
||||
if len(parts) > 1 {
|
||||
reason = strings.Join(parts[1:], " ")
|
||||
}
|
||||
return fmt.Errorf("authentication failed: %s", reason)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unexpected response: %s", response)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TCPClientSink) monitorConnection(conn net.Conn) {
|
||||
// Simple connection monitoring by periodic zero-byte reads
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
|
||||
Reference in New Issue
Block a user