From 58d33d78727ac8a6e6b9aa2278188840f774185224908ead44966cf70775218f Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Sat, 12 Jul 2025 01:32:07 -0400 Subject: [PATCH] v0.3.0 tcp/http client/server add for logwisp chain connection support, config refactor --- src/internal/config/filter.go | 31 ++- src/internal/config/pipeline.go | 110 +++++++-- src/internal/filter/chain.go | 3 +- src/internal/filter/filter.go | 40 +--- src/internal/service/service.go | 8 + src/internal/sink/http_client.go | 385 +++++++++++++++++++++++++++++ src/internal/sink/tcp_client.go | 376 +++++++++++++++++++++++++++++ src/internal/source/http.go | 400 +++++++++++++++++++++++++++++++ src/internal/source/tcp.go | 392 ++++++++++++++++++++++++++++++ 9 files changed, 1691 insertions(+), 54 deletions(-) create mode 100644 src/internal/sink/http_client.go create mode 100644 src/internal/sink/tcp_client.go create mode 100644 src/internal/source/http.go create mode 100644 src/internal/source/tcp.go diff --git a/src/internal/config/filter.go b/src/internal/config/filter.go index 178317f..8088067 100644 --- a/src/internal/config/filter.go +++ b/src/internal/config/filter.go @@ -4,14 +4,35 @@ package config import ( "fmt" "regexp" - - "logwisp/src/internal/filter" ) -func validateFilter(pipelineName string, filterIndex int, cfg *filter.Config) error { +// FilterType represents the filter type +type FilterType string + +const ( + FilterTypeInclude FilterType = "include" // Whitelist - only matching logs pass + FilterTypeExclude FilterType = "exclude" // Blacklist - matching logs are dropped +) + +// FilterLogic represents how multiple patterns are combined +type FilterLogic string + +const ( + FilterLogicOr FilterLogic = "or" // Match any pattern + FilterLogicAnd FilterLogic = "and" // Match all patterns +) + +// FilterConfig represents filter configuration +type FilterConfig struct { + Type FilterType `toml:"type"` + Logic FilterLogic `toml:"logic"` + Patterns []string `toml:"patterns"` +} + +func validateFilter(pipelineName string, filterIndex int, cfg *FilterConfig) error { // Validate filter type switch cfg.Type { - case filter.TypeInclude, filter.TypeExclude, "": + case FilterTypeInclude, FilterTypeExclude, "": // Valid types default: return fmt.Errorf("pipeline '%s' filter[%d]: invalid type '%s' (must be 'include' or 'exclude')", @@ -20,7 +41,7 @@ func validateFilter(pipelineName string, filterIndex int, cfg *filter.Config) er // Validate filter logic switch cfg.Logic { - case filter.LogicOr, filter.LogicAnd, "": + case FilterLogicOr, FilterLogicAnd, "": // Valid logic default: return fmt.Errorf("pipeline '%s' filter[%d]: invalid logic '%s' (must be 'or' or 'and')", diff --git a/src/internal/config/pipeline.go b/src/internal/config/pipeline.go index 1896979..89e4423 100644 --- a/src/internal/config/pipeline.go +++ b/src/internal/config/pipeline.go @@ -3,7 +3,8 @@ package config import ( "fmt" - "logwisp/src/internal/filter" + "net" + "net/url" "path/filepath" "strings" ) @@ -17,7 +18,7 @@ type PipelineConfig struct { Sources []SourceConfig `toml:"sources"` // Filter configuration - Filters []filter.Config `toml:"filters"` + Filters []FilterConfig `toml:"filters"` // Output sinks for this pipeline Sinks []SinkConfig `toml:"sinks"` @@ -93,29 +94,38 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err } } - case "file": - // Validate file source options - path, ok := cfg.Options["path"].(string) - if !ok || path == "" { - return fmt.Errorf("pipeline '%s' source[%d]: file source requires 'path' option", - pipelineName, sourceIndex) - } - - // Check for directory traversal - if strings.Contains(path, "..") { - return fmt.Errorf("pipeline '%s' source[%d]: path contains directory traversal", - pipelineName, sourceIndex) - } - case "stdin": // No specific validation needed for stdin + case "http": + // Validate HTTP source options + port, ok := toInt(cfg.Options["port"]) + if !ok || port < 1 || port > 65535 { + return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing HTTP port", + pipelineName, sourceIndex) + } + + // Validate path if provided + if ingestPath, ok := cfg.Options["ingest_path"].(string); ok { + if !strings.HasPrefix(ingestPath, "/") { + return fmt.Errorf("pipeline '%s' source[%d]: ingest path must start with /: %s", + pipelineName, sourceIndex, ingestPath) + } + } + + case "tcp": + // Validate TCP source options + port, ok := toInt(cfg.Options["port"]) + if !ok || port < 1 || port > 65535 { + return fmt.Errorf("pipeline '%s' source[%d]: invalid or missing TCP port", + pipelineName, sourceIndex) + } + default: return fmt.Errorf("pipeline '%s' source[%d]: unknown source type '%s'", pipelineName, sourceIndex, cfg.Type) } - // Note: RateLimit field is ignored for now as it's a placeholder return nil } @@ -228,6 +238,72 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts } } + case "http_client": + // Validate URL + urlStr, ok := cfg.Options["url"].(string) + if !ok || urlStr == "" { + return fmt.Errorf("pipeline '%s' sink[%d]: http_client sink requires 'url' option", + pipelineName, sinkIndex) + } + + // Validate URL format + parsedURL, err := url.Parse(urlStr) + if err != nil { + return fmt.Errorf("pipeline '%s' sink[%d]: invalid URL: %w", + pipelineName, sinkIndex, err) + } + if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" { + return fmt.Errorf("pipeline '%s' sink[%d]: URL must use http or https scheme", + pipelineName, sinkIndex) + } + + // Validate batch size + if batchSize, ok := toInt(cfg.Options["batch_size"]); ok { + if batchSize < 1 { + return fmt.Errorf("pipeline '%s' sink[%d]: batch_size must be positive: %d", + pipelineName, sinkIndex, batchSize) + } + } + + // Validate timeout + if timeout, ok := toInt(cfg.Options["timeout_seconds"]); ok { + if timeout < 1 { + return fmt.Errorf("pipeline '%s' sink[%d]: timeout_seconds must be positive: %d", + pipelineName, sinkIndex, timeout) + } + } + + case "tcp_client": + // FIXED: Added validation for TCP client sink + // Validate address + address, ok := cfg.Options["address"].(string) + if !ok || address == "" { + return fmt.Errorf("pipeline '%s' sink[%d]: tcp_client sink requires 'address' option", + pipelineName, sinkIndex) + } + + // Validate address format + _, _, err := net.SplitHostPort(address) + if err != nil { + return fmt.Errorf("pipeline '%s' sink[%d]: invalid address format (expected host:port): %w", + pipelineName, sinkIndex, err) + } + + // Validate timeouts + if dialTimeout, ok := toInt(cfg.Options["dial_timeout_seconds"]); ok { + if dialTimeout < 1 { + return fmt.Errorf("pipeline '%s' sink[%d]: dial_timeout_seconds must be positive: %d", + pipelineName, sinkIndex, dialTimeout) + } + } + + if writeTimeout, ok := toInt(cfg.Options["write_timeout_seconds"]); ok { + if writeTimeout < 1 { + return fmt.Errorf("pipeline '%s' sink[%d]: write_timeout_seconds must be positive: %d", + pipelineName, sinkIndex, writeTimeout) + } + } + case "file": // Validate file sink options directory, ok := cfg.Options["directory"].(string) diff --git a/src/internal/filter/chain.go b/src/internal/filter/chain.go index 9696d08..b455dea 100644 --- a/src/internal/filter/chain.go +++ b/src/internal/filter/chain.go @@ -5,6 +5,7 @@ import ( "fmt" "sync/atomic" + "logwisp/src/internal/config" "logwisp/src/internal/source" "github.com/lixenwraith/log" @@ -21,7 +22,7 @@ type Chain struct { } // NewChain creates a new filter chain from configurations -func NewChain(configs []Config, logger *log.Logger) (*Chain, error) { +func NewChain(configs []config.FilterConfig, logger *log.Logger) (*Chain, error) { chain := &Chain{ filters: make([]*Filter, 0, len(configs)), logger: logger, diff --git a/src/internal/filter/filter.go b/src/internal/filter/filter.go index 0964c2e..04c1996 100644 --- a/src/internal/filter/filter.go +++ b/src/internal/filter/filter.go @@ -7,37 +7,15 @@ import ( "sync" "sync/atomic" + "logwisp/src/internal/config" "logwisp/src/internal/source" "github.com/lixenwraith/log" ) -// Type represents the filter type -type Type string - -const ( - TypeInclude Type = "include" // Whitelist - only matching logs pass - TypeExclude Type = "exclude" // Blacklist - matching logs are dropped -) - -// Logic represents how multiple patterns are combined -type Logic string - -const ( - LogicOr Logic = "or" // Match any pattern - LogicAnd Logic = "and" // Match all patterns -) - -// Config represents filter configuration -type Config struct { - Type Type `toml:"type"` - Logic Logic `toml:"logic"` - Patterns []string `toml:"patterns"` -} - // Filter applies regex-based filtering to log entries type Filter struct { - config Config + config config.FilterConfig patterns []*regexp.Regexp mu sync.RWMutex logger *log.Logger @@ -49,13 +27,13 @@ type Filter struct { } // New creates a new filter from configuration -func New(cfg Config, logger *log.Logger) (*Filter, error) { +func New(cfg config.FilterConfig, logger *log.Logger) (*Filter, error) { // Set defaults if cfg.Type == "" { - cfg.Type = TypeInclude + cfg.Type = config.FilterTypeInclude } if cfg.Logic == "" { - cfg.Logic = LogicOr + cfg.Logic = config.FilterLogicOr } f := &Filter{ @@ -108,9 +86,9 @@ func (f *Filter) Apply(entry source.LogEntry) bool { // Determine if we should pass or drop shouldPass := false switch f.config.Type { - case TypeInclude: + case config.FilterTypeInclude: shouldPass = matched - case TypeExclude: + case config.FilterTypeExclude: shouldPass = !matched } @@ -124,7 +102,7 @@ func (f *Filter) Apply(entry source.LogEntry) bool { // matches checks if text matches the patterns according to the logic func (f *Filter) matches(text string) bool { switch f.config.Logic { - case LogicOr: + case config.FilterLogicOr: // Match any pattern for _, re := range f.patterns { if re.MatchString(text) { @@ -133,7 +111,7 @@ func (f *Filter) matches(text string) bool { } return false - case LogicAnd: + case config.FilterLogicAnd: // Must match all patterns for _, re := range f.patterns { if !re.MatchString(text) { diff --git a/src/internal/service/service.go b/src/internal/service/service.go index e084579..d9f4b3a 100644 --- a/src/internal/service/service.go +++ b/src/internal/service/service.go @@ -208,6 +208,10 @@ func (s *Service) createSource(cfg config.SourceConfig) (source.Source, error) { return source.NewDirectorySource(cfg.Options, s.logger) case "stdin": return source.NewStdinSource(cfg.Options, s.logger) + case "http": + return source.NewHTTPSource(cfg.Options, s.logger) + case "tcp": + return source.NewTCPSource(cfg.Options, s.logger) default: return nil, fmt.Errorf("unknown source type: %s", cfg.Type) } @@ -220,6 +224,10 @@ func (s *Service) createSink(cfg config.SinkConfig) (sink.Sink, error) { return sink.NewHTTPSink(cfg.Options, s.logger) case "tcp": return sink.NewTCPSink(cfg.Options, s.logger) + case "http_client": + return sink.NewHTTPClientSink(cfg.Options, s.logger) + case "tcp_client": + return sink.NewTCPClientSink(cfg.Options, s.logger) case "file": return sink.NewFileSink(cfg.Options, s.logger) case "stdout": diff --git a/src/internal/sink/http_client.go b/src/internal/sink/http_client.go new file mode 100644 index 0000000..3dc01d8 --- /dev/null +++ b/src/internal/sink/http_client.go @@ -0,0 +1,385 @@ +// FILE: src/internal/sink/http_client.go +package sink + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" + "github.com/valyala/fasthttp" +) + +// HTTPClientSink forwards log entries to a remote HTTP endpoint +type HTTPClientSink struct { + input chan source.LogEntry + config HTTPClientConfig + client *fasthttp.Client + batch []source.LogEntry + batchMu sync.Mutex + done chan struct{} + wg sync.WaitGroup + startTime time.Time + logger *log.Logger + + // Statistics + totalProcessed atomic.Uint64 + totalBatches atomic.Uint64 + failedBatches atomic.Uint64 + lastProcessed atomic.Value // time.Time + lastBatchSent atomic.Value // time.Time + activeConnections atomic.Int32 +} + +// HTTPClientConfig holds HTTP client sink configuration +type HTTPClientConfig struct { + URL string + BufferSize int + BatchSize int + BatchDelay time.Duration + Timeout time.Duration + Headers map[string]string + + // Retry configuration + MaxRetries int + RetryDelay time.Duration + RetryBackoff float64 // Multiplier for exponential backoff + + // TLS configuration + InsecureSkipVerify bool +} + +// NewHTTPClientSink creates a new HTTP client sink +func NewHTTPClientSink(options map[string]any, logger *log.Logger) (*HTTPClientSink, error) { + cfg := HTTPClientConfig{ + BufferSize: 1000, + BatchSize: 100, + BatchDelay: time.Second, + Timeout: 30 * time.Second, + MaxRetries: 3, + RetryDelay: time.Second, + RetryBackoff: 2.0, + Headers: make(map[string]string), + } + + // Extract URL + urlStr, ok := options["url"].(string) + if !ok || urlStr == "" { + return nil, fmt.Errorf("http_client sink requires 'url' option") + } + + // Validate URL + parsedURL, err := url.Parse(urlStr) + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" { + return nil, fmt.Errorf("URL must use http or https scheme") + } + cfg.URL = urlStr + + // Extract other options + if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { + cfg.BufferSize = bufSize + } + if batchSize, ok := toInt(options["batch_size"]); ok && batchSize > 0 { + cfg.BatchSize = batchSize + } + if delayMs, ok := toInt(options["batch_delay_ms"]); ok && delayMs > 0 { + cfg.BatchDelay = time.Duration(delayMs) * time.Millisecond + } + if timeoutSec, ok := toInt(options["timeout_seconds"]); ok && timeoutSec > 0 { + cfg.Timeout = time.Duration(timeoutSec) * time.Second + } + if maxRetries, ok := toInt(options["max_retries"]); ok && maxRetries >= 0 { + cfg.MaxRetries = maxRetries + } + if retryDelayMs, ok := toInt(options["retry_delay_ms"]); ok && retryDelayMs > 0 { + cfg.RetryDelay = time.Duration(retryDelayMs) * time.Millisecond + } + if backoff, ok := toFloat(options["retry_backoff"]); ok && backoff >= 1.0 { + cfg.RetryBackoff = backoff + } + if insecure, ok := options["insecure_skip_verify"].(bool); ok { + cfg.InsecureSkipVerify = insecure + } + + // Extract headers + if headers, ok := options["headers"].(map[string]any); ok { + for k, v := range headers { + if strVal, ok := v.(string); ok { + cfg.Headers[k] = strVal + } + } + } + + // Set default Content-Type if not specified + if _, exists := cfg.Headers["Content-Type"]; !exists { + cfg.Headers["Content-Type"] = "application/json" + } + + h := &HTTPClientSink{ + input: make(chan source.LogEntry, cfg.BufferSize), + config: cfg, + batch: make([]source.LogEntry, 0, cfg.BatchSize), + done: make(chan struct{}), + startTime: time.Now(), + logger: logger, + } + h.lastProcessed.Store(time.Time{}) + h.lastBatchSent.Store(time.Time{}) + + // Create fasthttp client + h.client = &fasthttp.Client{ + MaxConnsPerHost: 10, + MaxIdleConnDuration: 10 * time.Second, + ReadTimeout: cfg.Timeout, + WriteTimeout: cfg.Timeout, + DisableHeaderNamesNormalizing: true, + } + + // TODO: Implement custom TLS configuration, including InsecureSkipVerify, + // by setting a custom dialer on the fasthttp.Client. + // For example: + // if cfg.InsecureSkipVerify { + // h.client.Dial = func(addr string) (net.Conn, error) { + // return fasthttp.DialDualStackTimeout(addr, cfg.Timeout, &tls.Config{ + // InsecureSkipVerify: true, + // }) + // } + // } + // FIXED: Removed incorrect TLS configuration that referenced non-existent field + + return h, nil +} + +func (h *HTTPClientSink) Input() chan<- source.LogEntry { + return h.input +} + +func (h *HTTPClientSink) Start(ctx context.Context) error { + h.wg.Add(2) + go h.processLoop(ctx) + go h.batchTimer(ctx) + + h.logger.Info("msg", "HTTP client sink started", + "component", "http_client_sink", + "url", h.config.URL, + "batch_size", h.config.BatchSize, + "batch_delay", h.config.BatchDelay) + return nil +} + +func (h *HTTPClientSink) Stop() { + h.logger.Info("msg", "Stopping HTTP client sink") + close(h.done) + h.wg.Wait() + + // Send any remaining batched entries + h.batchMu.Lock() + if len(h.batch) > 0 { + batch := h.batch + h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batchMu.Unlock() + h.sendBatch(batch) + } else { + h.batchMu.Unlock() + } + + h.logger.Info("msg", "HTTP client sink stopped", + "total_processed", h.totalProcessed.Load(), + "total_batches", h.totalBatches.Load(), + "failed_batches", h.failedBatches.Load()) +} + +func (h *HTTPClientSink) GetStats() SinkStats { + lastProc, _ := h.lastProcessed.Load().(time.Time) + lastBatch, _ := h.lastBatchSent.Load().(time.Time) + + h.batchMu.Lock() + pendingEntries := len(h.batch) + h.batchMu.Unlock() + + return SinkStats{ + Type: "http_client", + TotalProcessed: h.totalProcessed.Load(), + ActiveConnections: h.activeConnections.Load(), + StartTime: h.startTime, + LastProcessed: lastProc, + Details: map[string]any{ + "url": h.config.URL, + "batch_size": h.config.BatchSize, + "pending_entries": pendingEntries, + "total_batches": h.totalBatches.Load(), + "failed_batches": h.failedBatches.Load(), + "last_batch_sent": lastBatch, + }, + } +} + +func (h *HTTPClientSink) processLoop(ctx context.Context) { + defer h.wg.Done() + + for { + select { + case entry, ok := <-h.input: + if !ok { + return + } + + h.totalProcessed.Add(1) + h.lastProcessed.Store(time.Now()) + + // Add to batch + h.batchMu.Lock() + h.batch = append(h.batch, entry) + + // Check if batch is full + if len(h.batch) >= h.config.BatchSize { + batch := h.batch + h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batchMu.Unlock() + + // Send batch in background + go h.sendBatch(batch) + } else { + h.batchMu.Unlock() + } + + case <-ctx.Done(): + return + case <-h.done: + return + } + } +} + +func (h *HTTPClientSink) batchTimer(ctx context.Context) { + defer h.wg.Done() + + ticker := time.NewTicker(h.config.BatchDelay) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.batchMu.Lock() + if len(h.batch) > 0 { + batch := h.batch + h.batch = make([]source.LogEntry, 0, h.config.BatchSize) + h.batchMu.Unlock() + + // Send batch in background + go h.sendBatch(batch) + } else { + h.batchMu.Unlock() + } + + case <-ctx.Done(): + return + case <-h.done: + return + } + } +} + +func (h *HTTPClientSink) sendBatch(batch []source.LogEntry) { + h.activeConnections.Add(1) + defer h.activeConnections.Add(-1) + + h.totalBatches.Add(1) + h.lastBatchSent.Store(time.Now()) + + // Prepare request body + body, err := json.Marshal(batch) + if err != nil { + h.logger.Error("msg", "Failed to marshal batch", + "component", "http_client_sink", + "error", err, + "batch_size", len(batch)) + h.failedBatches.Add(1) + return + } + + // Retry logic + var lastErr error + retryDelay := h.config.RetryDelay + + for attempt := 0; attempt <= h.config.MaxRetries; attempt++ { + if attempt > 0 { + // Wait before retry + time.Sleep(retryDelay) + retryDelay = time.Duration(float64(retryDelay) * h.config.RetryBackoff) + } + + // Create request + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(h.config.URL) + req.Header.SetMethod("POST") + req.SetBody(body) + + // Set headers + for k, v := range h.config.Headers { + req.Header.Set(k, v) + } + + // Send request + err := h.client.DoTimeout(req, resp, h.config.Timeout) + if err != nil { + lastErr = fmt.Errorf("request failed: %w", err) + h.logger.Warn("msg", "HTTP request failed", + "component", "http_client_sink", + "attempt", attempt+1, + "error", err) + continue + } + + // Check response status + statusCode := resp.StatusCode() + if statusCode >= 200 && statusCode < 300 { + // Success + h.logger.Debug("msg", "Batch sent successfully", + "component", "http_client_sink", + "batch_size", len(batch), + "status_code", statusCode) + return + } + + // Non-2xx status + lastErr = fmt.Errorf("server returned status %d: %s", statusCode, resp.Body()) + + // Don't retry on 4xx errors (client errors) + if statusCode >= 400 && statusCode < 500 { + h.logger.Error("msg", "Batch rejected by server", + "component", "http_client_sink", + "status_code", statusCode, + "response", string(resp.Body()), + "batch_size", len(batch)) + h.failedBatches.Add(1) + return + } + + h.logger.Warn("msg", "Server returned error status", + "component", "http_client_sink", + "attempt", attempt+1, + "status_code", statusCode, + "response", string(resp.Body())) + } + + // All retries failed + h.logger.Error("msg", "Failed to send batch after retries", + "component", "http_client_sink", + "batch_size", len(batch), + "last_error", lastErr) + h.failedBatches.Add(1) +} \ No newline at end of file diff --git a/src/internal/sink/tcp_client.go b/src/internal/sink/tcp_client.go new file mode 100644 index 0000000..26ee669 --- /dev/null +++ b/src/internal/sink/tcp_client.go @@ -0,0 +1,376 @@ +// FILE: src/internal/sink/tcp_client.go +package sink + +import ( + "context" + "encoding/json" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" +) + +// TCPClientSink forwards log entries to a remote TCP endpoint +type TCPClientSink struct { + input chan source.LogEntry + config TCPClientConfig + conn net.Conn + connMu sync.RWMutex + done chan struct{} + wg sync.WaitGroup + startTime time.Time + logger *log.Logger + + // Reconnection state + reconnecting atomic.Bool + lastConnectErr error + connectTime time.Time + + // Statistics + totalProcessed atomic.Uint64 + totalFailed atomic.Uint64 + totalReconnects atomic.Uint64 + lastProcessed atomic.Value // time.Time + connectionUptime atomic.Value // time.Duration +} + +// TCPClientConfig holds TCP client sink configuration +type TCPClientConfig struct { + Address string + BufferSize int + DialTimeout time.Duration + WriteTimeout time.Duration + KeepAlive time.Duration + + // Reconnection settings + ReconnectDelay time.Duration + MaxReconnectDelay time.Duration + ReconnectBackoff float64 +} + +// NewTCPClientSink creates a new TCP client sink +func NewTCPClientSink(options map[string]any, logger *log.Logger) (*TCPClientSink, error) { + cfg := TCPClientConfig{ + BufferSize: 1000, + DialTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + ReconnectDelay: time.Second, + MaxReconnectDelay: 30 * time.Second, + ReconnectBackoff: 1.5, + } + + // Extract address + address, ok := options["address"].(string) + if !ok || address == "" { + return nil, fmt.Errorf("tcp_client sink requires 'address' option") + } + + // Validate address format + _, _, err := net.SplitHostPort(address) + if err != nil { + return nil, fmt.Errorf("invalid address format (expected host:port): %w", err) + } + cfg.Address = address + + // Extract other options + if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { + cfg.BufferSize = bufSize + } + if dialTimeout, ok := toInt(options["dial_timeout_seconds"]); ok && dialTimeout > 0 { + cfg.DialTimeout = time.Duration(dialTimeout) * time.Second + } + if writeTimeout, ok := toInt(options["write_timeout_seconds"]); ok && writeTimeout > 0 { + cfg.WriteTimeout = time.Duration(writeTimeout) * time.Second + } + if keepAlive, ok := toInt(options["keep_alive_seconds"]); ok && keepAlive > 0 { + cfg.KeepAlive = time.Duration(keepAlive) * time.Second + } + if reconnectDelay, ok := toInt(options["reconnect_delay_ms"]); ok && reconnectDelay > 0 { + cfg.ReconnectDelay = time.Duration(reconnectDelay) * time.Millisecond + } + if maxReconnectDelay, ok := toInt(options["max_reconnect_delay_seconds"]); ok && maxReconnectDelay > 0 { + cfg.MaxReconnectDelay = time.Duration(maxReconnectDelay) * time.Second + } + if backoff, ok := toFloat(options["reconnect_backoff"]); ok && backoff >= 1.0 { + cfg.ReconnectBackoff = backoff + } + + t := &TCPClientSink{ + input: make(chan source.LogEntry, cfg.BufferSize), + config: cfg, + done: make(chan struct{}), + startTime: time.Now(), + logger: logger, + } + t.lastProcessed.Store(time.Time{}) + t.connectionUptime.Store(time.Duration(0)) + + return t, nil +} + +func (t *TCPClientSink) Input() chan<- source.LogEntry { + return t.input +} + +func (t *TCPClientSink) Start(ctx context.Context) error { + // Start connection manager + t.wg.Add(1) + go t.connectionManager(ctx) + + // Start processing loop + t.wg.Add(1) + go t.processLoop(ctx) + + t.logger.Info("msg", "TCP client sink started", + "component", "tcp_client_sink", + "address", t.config.Address) + return nil +} + +func (t *TCPClientSink) Stop() { + t.logger.Info("msg", "Stopping TCP client sink") + close(t.done) + t.wg.Wait() + + // Close connection + t.connMu.Lock() + if t.conn != nil { + t.conn.Close() + } + t.connMu.Unlock() + + t.logger.Info("msg", "TCP client sink stopped", + "total_processed", t.totalProcessed.Load(), + "total_failed", t.totalFailed.Load(), + "total_reconnects", t.totalReconnects.Load()) +} + +func (t *TCPClientSink) GetStats() SinkStats { + lastProc, _ := t.lastProcessed.Load().(time.Time) + uptime, _ := t.connectionUptime.Load().(time.Duration) + + t.connMu.RLock() + connected := t.conn != nil + t.connMu.RUnlock() + + activeConns := int32(0) + if connected { + activeConns = 1 + } + + return SinkStats{ + Type: "tcp_client", + TotalProcessed: t.totalProcessed.Load(), + ActiveConnections: activeConns, + StartTime: t.startTime, + LastProcessed: lastProc, + Details: map[string]any{ + "address": t.config.Address, + "connected": connected, + "reconnecting": t.reconnecting.Load(), + "total_failed": t.totalFailed.Load(), + "total_reconnects": t.totalReconnects.Load(), + "connection_uptime": uptime.Seconds(), + "last_error": fmt.Sprintf("%v", t.lastConnectErr), + }, + } +} + +func (t *TCPClientSink) connectionManager(ctx context.Context) { + defer t.wg.Done() + + reconnectDelay := t.config.ReconnectDelay + + for { + select { + case <-ctx.Done(): + return + case <-t.done: + return + default: + } + + // Attempt to connect + t.reconnecting.Store(true) + conn, err := t.connect() + t.reconnecting.Store(false) + + if err != nil { + t.lastConnectErr = err + t.logger.Warn("msg", "Failed to connect to TCP server", + "component", "tcp_client_sink", + "address", t.config.Address, + "error", err, + "retry_delay", reconnectDelay) + + // Wait before retry + select { + case <-ctx.Done(): + return + case <-t.done: + return + case <-time.After(reconnectDelay): + } + + // Exponential backoff + reconnectDelay = time.Duration(float64(reconnectDelay) * t.config.ReconnectBackoff) + if reconnectDelay > t.config.MaxReconnectDelay { + reconnectDelay = t.config.MaxReconnectDelay + } + continue + } + + // Connection successful + t.lastConnectErr = nil + reconnectDelay = t.config.ReconnectDelay // Reset backoff + t.connectTime = time.Now() + t.totalReconnects.Add(1) + + t.connMu.Lock() + t.conn = conn + t.connMu.Unlock() + + t.logger.Info("msg", "Connected to TCP server", + "component", "tcp_client_sink", + "address", t.config.Address, + "local_addr", conn.LocalAddr()) + + // Monitor connection + t.monitorConnection(conn) + + // Connection lost, clear it + t.connMu.Lock() + t.conn = nil + t.connMu.Unlock() + + // Update connection uptime + uptime := time.Since(t.connectTime) + t.connectionUptime.Store(uptime) + + t.logger.Warn("msg", "Lost connection to TCP server", + "component", "tcp_client_sink", + "address", t.config.Address, + "uptime", uptime) + } +} + +func (t *TCPClientSink) connect() (net.Conn, error) { + dialer := &net.Dialer{ + Timeout: t.config.DialTimeout, + KeepAlive: t.config.KeepAlive, + } + + conn, err := dialer.Dial("tcp", t.config.Address) + if err != nil { + return nil, err + } + + // Set TCP keep-alive + if tcpConn, ok := conn.(*net.TCPConn); ok { + tcpConn.SetKeepAlive(true) + tcpConn.SetKeepAlivePeriod(t.config.KeepAlive) + } + + return conn, nil +} + +func (t *TCPClientSink) monitorConnection(conn net.Conn) { + // Simple connection monitoring by periodic zero-byte reads + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + buf := make([]byte, 1) + for { + select { + case <-t.done: + return + case <-ticker.C: + // Set read deadline + conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + + // Try to read (we don't expect any data) + _, err := conn.Read(buf) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // Timeout is expected, connection is still alive + continue + } + // Real error, connection is dead + return + } + } + } +} + +func (t *TCPClientSink) processLoop(ctx context.Context) { + defer t.wg.Done() + + for { + select { + case entry, ok := <-t.input: + if !ok { + return + } + + t.totalProcessed.Add(1) + t.lastProcessed.Store(time.Now()) + + // Send entry + if err := t.sendEntry(entry); err != nil { + t.totalFailed.Add(1) + t.logger.Debug("msg", "Failed to send log entry", + "component", "tcp_client_sink", + "error", err) + } + + case <-ctx.Done(): + return + case <-t.done: + return + } + } +} + +func (t *TCPClientSink) sendEntry(entry source.LogEntry) error { + // Get current connection + t.connMu.RLock() + conn := t.conn + t.connMu.RUnlock() + + if conn == nil { + return fmt.Errorf("not connected") + } + + // Marshal to JSON + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed to marshal entry: %w", err) + } + + // Add newline + data = append(data, '\n') + + // Set write deadline + if err := conn.SetWriteDeadline(time.Now().Add(t.config.WriteTimeout)); err != nil { + return fmt.Errorf("failed to set write deadline: %w", err) + } + + // Write data + n, err := conn.Write(data) + if err != nil { + // Connection error, it will be reconnected + return fmt.Errorf("write failed: %w", err) + } + + if n != len(data) { + return fmt.Errorf("partial write: %d/%d bytes", n, len(data)) + } + + return nil +} \ No newline at end of file diff --git a/src/internal/source/http.go b/src/internal/source/http.go new file mode 100644 index 0000000..55f6f06 --- /dev/null +++ b/src/internal/source/http.go @@ -0,0 +1,400 @@ +// FILE: src/internal/source/http.go +package source + +import ( + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/ratelimit" + + "github.com/lixenwraith/log" + "github.com/valyala/fasthttp" +) + +// HTTPSource receives log entries via HTTP POST requests +type HTTPSource struct { + port int + ingestPath string + bufferSize int + server *fasthttp.Server + subscribers []chan LogEntry + mu sync.RWMutex + done chan struct{} + wg sync.WaitGroup + rateLimiter *ratelimit.Limiter + logger *log.Logger + + // Statistics + totalEntries atomic.Uint64 + droppedEntries atomic.Uint64 + invalidEntries atomic.Uint64 + startTime time.Time + lastEntryTime atomic.Value // time.Time +} + +// NewHTTPSource creates a new HTTP server source +func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, error) { + port, ok := toInt(options["port"]) + if !ok || port < 1 || port > 65535 { + return nil, fmt.Errorf("http source requires valid 'port' option") + } + + ingestPath := "/ingest" + if path, ok := options["ingest_path"].(string); ok && path != "" { + ingestPath = path + } + + bufferSize := 1000 + if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { + bufferSize = bufSize + } + + h := &HTTPSource{ + port: port, + ingestPath: ingestPath, + bufferSize: bufferSize, + done: make(chan struct{}), + startTime: time.Now(), + logger: logger, + } + h.lastEntryTime.Store(time.Time{}) + + // Initialize rate limiter if configured + if rl, ok := options["rate_limit"].(map[string]any); ok { + if enabled, _ := rl["enabled"].(bool); enabled { + cfg := config.RateLimitConfig{ + Enabled: true, + } + + if rps, ok := toFloat(rl["requests_per_second"]); ok { + cfg.RequestsPerSecond = rps + } + if burst, ok := toInt(rl["burst_size"]); ok { + cfg.BurstSize = burst + } + if limitBy, ok := rl["limit_by"].(string); ok { + cfg.LimitBy = limitBy + } + if respCode, ok := toInt(rl["response_code"]); ok { + cfg.ResponseCode = respCode + } + if msg, ok := rl["response_message"].(string); ok { + cfg.ResponseMessage = msg + } + if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { + cfg.MaxConnectionsPerIP = maxPerIP + } + + h.rateLimiter = ratelimit.New(cfg, logger) + } + } + + return h, nil +} + +func (h *HTTPSource) Subscribe() <-chan LogEntry { + h.mu.Lock() + defer h.mu.Unlock() + + ch := make(chan LogEntry, h.bufferSize) + h.subscribers = append(h.subscribers, ch) + return ch +} + +func (h *HTTPSource) Start() error { + h.server = &fasthttp.Server{ + Handler: h.requestHandler, + DisableKeepalive: false, + StreamRequestBody: true, + CloseOnShutdown: true, + } + + addr := fmt.Sprintf(":%d", h.port) + + // Start server in background + h.wg.Add(1) + go func() { + defer h.wg.Done() + h.logger.Info("msg", "HTTP source server starting", + "component", "http_source", + "port", h.port, + "ingest_path", h.ingestPath) + + if err := h.server.ListenAndServe(addr); err != nil { + h.logger.Error("msg", "HTTP source server failed", + "component", "http_source", + "port", h.port, + "error", err) + } + }() + + // Give server time to start + time.Sleep(100 * time.Millisecond) + return nil +} + +func (h *HTTPSource) Stop() { + h.logger.Info("msg", "Stopping HTTP source") + close(h.done) + + if h.server != nil { + if err := h.server.Shutdown(); err != nil { + h.logger.Error("msg", "Error shutting down HTTP source server", + "component", "http_source", + "error", err) + } + } + + // Shutdown rate limiter + if h.rateLimiter != nil { + h.rateLimiter.Shutdown() + } + + h.wg.Wait() + + // Close subscriber channels + h.mu.Lock() + for _, ch := range h.subscribers { + close(ch) + } + h.mu.Unlock() + + h.logger.Info("msg", "HTTP source stopped") +} + +func (h *HTTPSource) GetStats() SourceStats { + lastEntry, _ := h.lastEntryTime.Load().(time.Time) + + var rateLimitStats map[string]any + if h.rateLimiter != nil { + rateLimitStats = h.rateLimiter.GetStats() + } + + return SourceStats{ + Type: "http", + TotalEntries: h.totalEntries.Load(), + DroppedEntries: h.droppedEntries.Load(), + StartTime: h.startTime, + LastEntryTime: lastEntry, + Details: map[string]any{ + "port": h.port, + "ingest_path": h.ingestPath, + "invalid_entries": h.invalidEntries.Load(), + "rate_limit": rateLimitStats, + }, + } +} + +func (h *HTTPSource) ApplyRateLimit(entry LogEntry) (LogEntry, bool) { + // TODO: Implement source-side rate limiting for aggregation/summarization + return entry, true +} + +func (h *HTTPSource) requestHandler(ctx *fasthttp.RequestCtx) { + // Only handle POST to the configured ingest path + if string(ctx.Method()) != "POST" || string(ctx.Path()) != h.ingestPath { + ctx.SetStatusCode(fasthttp.StatusNotFound) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]string{ + "error": "Not Found", + "hint": fmt.Sprintf("POST logs to %s", h.ingestPath), + }) + return + } + + // Check rate limit + remoteAddr := ctx.RemoteAddr().String() + if h.rateLimiter != nil { + if allowed, statusCode, message := h.rateLimiter.CheckHTTP(remoteAddr); !allowed { + ctx.SetStatusCode(statusCode) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]any{ + "error": message, + "retry_after": "60", + }) + return + } + } + + // Process the request body + body := ctx.PostBody() + if len(body) == 0 { + ctx.SetStatusCode(fasthttp.StatusBadRequest) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]string{ + "error": "Empty request body", + }) + return + } + + // Parse the log entries + entries, err := h.parseEntries(body) + if err != nil { + h.invalidEntries.Add(1) + ctx.SetStatusCode(fasthttp.StatusBadRequest) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]string{ + "error": fmt.Sprintf("Invalid log format: %v", err), + }) + return + } + + // Publish entries + accepted := 0 + for _, entry := range entries { + if h.publish(entry) { + accepted++ + } + } + + // Return success response + ctx.SetStatusCode(fasthttp.StatusAccepted) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]any{ + "accepted": accepted, + "total": len(entries), + }) +} + +func (h *HTTPSource) parseEntries(body []byte) ([]LogEntry, error) { + var entries []LogEntry + + // Try to parse as single JSON object first + var single LogEntry + if err := json.Unmarshal(body, &single); err == nil { + // Validate required fields + if single.Message == "" { + return nil, fmt.Errorf("missing required field: message") + } + if single.Time.IsZero() { + single.Time = time.Now() + } + if single.Source == "" { + single.Source = "http" + } + entries = append(entries, single) + return entries, nil + } + + // Try to parse as JSON array + var array []LogEntry + if err := json.Unmarshal(body, &array); err == nil { + for i, entry := range array { + if entry.Message == "" { + return nil, fmt.Errorf("entry %d missing required field: message", i) + } + if entry.Time.IsZero() { + array[i].Time = time.Now() + } + if entry.Source == "" { + array[i].Source = "http" + } + } + return array, nil + } + + // Try to parse as newline-delimited JSON + lines := splitLines(body) + for i, line := range lines { + if len(line) == 0 { + continue + } + + var entry LogEntry + if err := json.Unmarshal(line, &entry); err != nil { + return nil, fmt.Errorf("line %d: %w", i+1, err) + } + + if entry.Message == "" { + return nil, fmt.Errorf("line %d missing required field: message", i+1) + } + if entry.Time.IsZero() { + entry.Time = time.Now() + } + if entry.Source == "" { + entry.Source = "http" + } + + entries = append(entries, entry) + } + + if len(entries) == 0 { + return nil, fmt.Errorf("no valid log entries found") + } + + return entries, nil +} + +func (h *HTTPSource) publish(entry LogEntry) bool { + // Apply rate limiting + entry, allowed := h.ApplyRateLimit(entry) + if !allowed { + return false + } + + h.mu.RLock() + defer h.mu.RUnlock() + + h.totalEntries.Add(1) + h.lastEntryTime.Store(entry.Time) + + dropped := false + for _, ch := range h.subscribers { + select { + case ch <- entry: + default: + dropped = true + h.droppedEntries.Add(1) + } + } + + if dropped { + h.logger.Debug("msg", "Dropped log entry - subscriber buffer full", + "component", "http_source") + } + + return true +} + +// splitLines splits bytes into lines, handling both \n and \r\n +func splitLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + + for i := 0; i < len(data); i++ { + if data[i] == '\n' { + end := i + if i > 0 && data[i-1] == '\r' { + end = i - 1 + } + if end > start { + lines = append(lines, data[start:end]) + } + start = i + 1 + } + } + + if start < len(data) { + lines = append(lines, data[start:]) + } + + return lines +} + +// Helper function for type conversion +func toFloat(v any) (float64, bool) { + switch val := v.(type) { + case float64: + return val, true + case int: + return float64(val), true + case int64: + return float64(val), true + default: + return 0, false + } +} \ No newline at end of file diff --git a/src/internal/source/tcp.go b/src/internal/source/tcp.go new file mode 100644 index 0000000..c8809eb --- /dev/null +++ b/src/internal/source/tcp.go @@ -0,0 +1,392 @@ +// FILE: src/internal/source/tcp.go +package source + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/ratelimit" + + "github.com/lixenwraith/log" + "github.com/panjf2000/gnet/v2" +) + +// TCPSource receives log entries via TCP connections +type TCPSource struct { + port int + bufferSize int + server *tcpSourceServer + subscribers []chan LogEntry + mu sync.RWMutex + done chan struct{} + engine *gnet.Engine + engineMu sync.Mutex + wg sync.WaitGroup + rateLimiter *ratelimit.Limiter + logger *log.Logger + + // Statistics + totalEntries atomic.Uint64 + droppedEntries atomic.Uint64 + invalidEntries atomic.Uint64 + activeConns atomic.Int32 + startTime time.Time + lastEntryTime atomic.Value // time.Time +} + +// NewTCPSource creates a new TCP server source +func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error) { + port, ok := toInt(options["port"]) + if !ok || port < 1 || port > 65535 { + return nil, fmt.Errorf("tcp source requires valid 'port' option") + } + + bufferSize := 1000 + if bufSize, ok := toInt(options["buffer_size"]); ok && bufSize > 0 { + bufferSize = bufSize + } + + t := &TCPSource{ + port: port, + bufferSize: bufferSize, + done: make(chan struct{}), + startTime: time.Now(), + logger: logger, + } + t.lastEntryTime.Store(time.Time{}) + + // Initialize rate limiter if configured + if rl, ok := options["rate_limit"].(map[string]any); ok { + if enabled, _ := rl["enabled"].(bool); enabled { + cfg := config.RateLimitConfig{ + Enabled: true, + } + + if rps, ok := toFloat(rl["requests_per_second"]); ok { + cfg.RequestsPerSecond = rps + } + if burst, ok := toInt(rl["burst_size"]); ok { + cfg.BurstSize = burst + } + if limitBy, ok := rl["limit_by"].(string); ok { + cfg.LimitBy = limitBy + } + if maxPerIP, ok := toInt(rl["max_connections_per_ip"]); ok { + cfg.MaxConnectionsPerIP = maxPerIP + } + if maxTotal, ok := toInt(rl["max_total_connections"]); ok { + cfg.MaxTotalConnections = maxTotal + } + + t.rateLimiter = ratelimit.New(cfg, logger) + } + } + + return t, nil +} + +func (t *TCPSource) Subscribe() <-chan LogEntry { + t.mu.Lock() + defer t.mu.Unlock() + + ch := make(chan LogEntry, t.bufferSize) + t.subscribers = append(t.subscribers, ch) + return ch +} + +func (t *TCPSource) Start() error { + t.server = &tcpSourceServer{ + source: t, + clients: make(map[gnet.Conn]*tcpClient), + } + + addr := fmt.Sprintf("tcp://:%d", t.port) + + // Start gnet server in background + t.wg.Add(1) + go func() { + defer t.wg.Done() + t.logger.Info("msg", "TCP source server starting", + "component", "tcp_source", + "port", t.port) + + err := gnet.Run(t.server, addr, + gnet.WithLogger(noopLogger{}), + gnet.WithMulticore(true), + gnet.WithReusePort(true), + ) + if err != nil { + t.logger.Error("msg", "TCP source server failed", + "component", "tcp_source", + "port", t.port, + "error", err) + } + }() + + // Give server time to start + time.Sleep(100 * time.Millisecond) + return nil +} + +func (t *TCPSource) Stop() { + t.logger.Info("msg", "Stopping TCP source") + close(t.done) + + // Stop gnet engine if running + t.engineMu.Lock() + engine := t.engine + t.engineMu.Unlock() + + if engine != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + (*engine).Stop(ctx) + } + + // Shutdown rate limiter + if t.rateLimiter != nil { + t.rateLimiter.Shutdown() + } + + t.wg.Wait() + + // Close subscriber channels + t.mu.Lock() + for _, ch := range t.subscribers { + close(ch) + } + t.mu.Unlock() + + t.logger.Info("msg", "TCP source stopped") +} + +func (t *TCPSource) GetStats() SourceStats { + lastEntry, _ := t.lastEntryTime.Load().(time.Time) + + var rateLimitStats map[string]any + if t.rateLimiter != nil { + rateLimitStats = t.rateLimiter.GetStats() + } + + return SourceStats{ + Type: "tcp", + TotalEntries: t.totalEntries.Load(), + DroppedEntries: t.droppedEntries.Load(), + StartTime: t.startTime, + LastEntryTime: lastEntry, + Details: map[string]any{ + "port": t.port, + "active_connections": t.activeConns.Load(), + "invalid_entries": t.invalidEntries.Load(), + "rate_limit": rateLimitStats, + }, + } +} + +func (t *TCPSource) ApplyRateLimit(entry LogEntry) (LogEntry, bool) { + // TODO: Implement source-side rate limiting for aggregation/summarization + return entry, true +} + +func (t *TCPSource) publish(entry LogEntry) bool { + // Apply rate limiting + entry, allowed := t.ApplyRateLimit(entry) + if !allowed { + return false + } + + t.mu.RLock() + defer t.mu.RUnlock() + + t.totalEntries.Add(1) + t.lastEntryTime.Store(entry.Time) + + dropped := false + for _, ch := range t.subscribers { + select { + case ch <- entry: + default: + dropped = true + t.droppedEntries.Add(1) + } + } + + if dropped { + t.logger.Debug("msg", "Dropped log entry - subscriber buffer full", + "component", "tcp_source") + } + + return true +} + +// tcpClient represents a connected TCP client +type tcpClient struct { + conn gnet.Conn + buffer bytes.Buffer +} + +// tcpSourceServer handles gnet events +type tcpSourceServer struct { + gnet.BuiltinEventEngine + source *TCPSource + clients map[gnet.Conn]*tcpClient + mu sync.RWMutex +} + +func (s *tcpSourceServer) OnBoot(eng gnet.Engine) gnet.Action { + // Store engine reference for shutdown + s.source.engineMu.Lock() + s.source.engine = &eng + s.source.engineMu.Unlock() + + s.source.logger.Debug("msg", "TCP source server booted", + "component", "tcp_source", + "port", s.source.port) + return gnet.None +} + +func (s *tcpSourceServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + remoteAddr := c.RemoteAddr().String() + s.source.logger.Debug("msg", "TCP connection attempt", + "component", "tcp_source", + "remote_addr", remoteAddr) + + // Check rate limit + if s.source.rateLimiter != nil { + remoteStr := c.RemoteAddr().String() + tcpAddr, err := net.ResolveTCPAddr("tcp", remoteStr) + if err != nil { + s.source.logger.Warn("msg", "Failed to parse TCP address", + "component", "tcp_source", + "remote_addr", remoteAddr, + "error", err) + return nil, gnet.Close + } + + if !s.source.rateLimiter.CheckTCP(tcpAddr) { + s.source.logger.Warn("msg", "TCP connection rate limited", + "component", "tcp_source", + "remote_addr", remoteAddr) + return nil, gnet.Close + } + + // Track connection + s.source.rateLimiter.AddConnection(remoteStr) + } + + // Create client state + s.mu.Lock() + s.clients[c] = &tcpClient{conn: c} + s.mu.Unlock() + + newCount := s.source.activeConns.Add(1) + s.source.logger.Debug("msg", "TCP connection opened", + "component", "tcp_source", + "remote_addr", remoteAddr, + "active_connections", newCount) + + return nil, gnet.None +} + +func (s *tcpSourceServer) OnClose(c gnet.Conn, err error) gnet.Action { + remoteAddr := c.RemoteAddr().String() + + // Remove client state + s.mu.Lock() + delete(s.clients, c) + s.mu.Unlock() + + // Remove connection tracking + if s.source.rateLimiter != nil { + s.source.rateLimiter.RemoveConnection(remoteAddr) + } + + newCount := s.source.activeConns.Add(-1) + s.source.logger.Debug("msg", "TCP connection closed", + "component", "tcp_source", + "remote_addr", remoteAddr, + "active_connections", newCount, + "error", err) + return gnet.None +} + +func (s *tcpSourceServer) OnTraffic(c gnet.Conn) gnet.Action { + s.mu.RLock() + client, exists := s.clients[c] + s.mu.RUnlock() + + if !exists { + return gnet.Close + } + + // Read all available data + data, err := c.Next(-1) + if err != nil { + s.source.logger.Error("msg", "Error reading from connection", + "component", "tcp_source", + "error", err) + return gnet.Close + } + + // Append to client buffer + client.buffer.Write(data) + + // Process complete lines + for { + line, err := client.buffer.ReadBytes('\n') + if err != nil { + // No complete line available + break + } + + // Trim newline + line = bytes.TrimRight(line, "\r\n") + if len(line) == 0 { + continue + } + + // Parse JSON log entry + var entry LogEntry + if err := json.Unmarshal(line, &entry); err != nil { + s.source.invalidEntries.Add(1) + s.source.logger.Debug("msg", "Invalid JSON log entry", + "component", "tcp_source", + "error", err, + "data", string(line)) + continue + } + + // Validate and set defaults + if entry.Message == "" { + s.source.invalidEntries.Add(1) + continue + } + if entry.Time.IsZero() { + entry.Time = time.Now() + } + if entry.Source == "" { + entry.Source = "tcp" + } + + // Publish the entry + s.source.publish(entry) + } + + return gnet.None +} + +// noopLogger implements gnet's Logger interface but discards everything +type noopLogger struct{} + +func (n noopLogger) Debugf(format string, args ...any) {} +func (n noopLogger) Infof(format string, args ...any) {} +func (n noopLogger) Warnf(format string, args ...any) {} +func (n noopLogger) Errorf(format string, args ...any) {} +func (n noopLogger) Fatalf(format string, args ...any) {} \ No newline at end of file