v0.4.4 refactor, but fixes, minor improvements

This commit is contained in:
2025-09-25 09:18:33 -04:00
parent 5ea4dc8f5f
commit 9111d054fd
8 changed files with 340 additions and 83 deletions

View File

@ -50,6 +50,12 @@ func initializeLogger(cfg *config.Config) error {
logger = log.NewLogger() logger = log.NewLogger()
logCfg := log.DefaultConfig() logCfg := log.DefaultConfig()
// Prevent empty directory creation if file logging is not configured
if cfg.Logging.Output != "file" && cfg.Logging.Output != "all" {
logCfg.DisableFile = true
logCfg.Directory = ""
}
if cfg.Quiet { if cfg.Quiet {
// In quiet mode, disable ALL logging output // In quiet mode, disable ALL logging output
logCfg.Level = 255 // A level that disables all output logCfg.Level = 255 // A level that disables all output
@ -68,23 +74,31 @@ func initializeLogger(cfg *config.Config) error {
// Configure based on output mode // Configure based on output mode
switch cfg.Logging.Output { switch cfg.Logging.Output {
case "none": case "none":
logCfg.DisableFile = true
logCfg.EnableStdout = false logCfg.EnableStdout = false
case "stdout": case "stdout":
logCfg.DisableFile = true
logCfg.EnableStdout = true logCfg.EnableStdout = true
logCfg.StdoutTarget = "stdout" logCfg.StdoutTarget = "stdout"
case "stderr": case "stderr":
logCfg.DisableFile = true
logCfg.EnableStdout = true logCfg.EnableStdout = true
logCfg.StdoutTarget = "stderr" logCfg.StdoutTarget = "stderr"
case "split":
// Console-only with split output: INFO/DEBUG to stdout, WARN/ERROR to stderr
logCfg.EnableStdout = true
logCfg.StdoutTarget = "split"
case "file": case "file":
logCfg.DisableFile = false
logCfg.EnableStdout = false logCfg.EnableStdout = false
configureFileLogging(logCfg, cfg) configureFileLogging(logCfg, cfg)
case "both": case "both":
logCfg.DisableFile = false
logCfg.EnableStdout = true logCfg.EnableStdout = true
logCfg.StdoutTarget = "stdout"
configureFileLogging(logCfg, cfg)
case "all":
logCfg.DisableFile = false
logCfg.EnableStdout = true
logCfg.StdoutTarget = "split"
configureFileLogging(logCfg, cfg) configureFileLogging(logCfg, cfg)
configureConsoleTarget(logCfg, cfg)
default: default:
return fmt.Errorf("invalid log output mode: %s", cfg.Logging.Output) return fmt.Errorf("invalid log output mode: %s", cfg.Logging.Output)
} }
@ -97,7 +111,7 @@ func initializeLogger(cfg *config.Config) error {
return logger.ApplyConfig(logCfg) return logger.ApplyConfig(logCfg)
} }
// configureFileLogging sets up file-based logging parameters // Sets up file-based logging parameters
func configureFileLogging(logCfg *log.Config, cfg *config.Config) { func configureFileLogging(logCfg *log.Config, cfg *config.Config) {
if cfg.Logging.File != nil { if cfg.Logging.File != nil {
logCfg.Directory = cfg.Logging.File.Directory logCfg.Directory = cfg.Logging.File.Directory
@ -110,18 +124,6 @@ func configureFileLogging(logCfg *log.Config, cfg *config.Config) {
} }
} }
// configureConsoleTarget sets up console output parameters
func configureConsoleTarget(logCfg *log.Config, cfg *config.Config) {
target := "stderr" // default
if cfg.Logging.Console != nil && cfg.Logging.Console.Target != "" {
target = cfg.Logging.Console.Target
}
// Set the target, which can be "stdout", "stderr", or "split"
logCfg.StdoutTarget = target
}
func parseLogLevel(level string) (int64, error) { func parseLogLevel(level string) (int64, error) {
switch strings.ToLower(level) { switch strings.ToLower(level) {
case "debug": case "debug":

View File

@ -10,7 +10,7 @@ import (
"logwisp/src/internal/service" "logwisp/src/internal/service"
) )
// statusReporter periodically logs service status // Periodically logs service status
func statusReporter(service *service.Service, ctx context.Context) { func statusReporter(service *service.Service, ctx context.Context) {
ticker := time.NewTicker(30 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop() defer ticker.Stop()
@ -60,7 +60,7 @@ func statusReporter(service *service.Service, ctx context.Context) {
} }
} }
// logPipelineStatus logs the status of an individual pipeline // Logs the status of an individual pipeline
func logPipelineStatus(name string, stats map[string]any) { func logPipelineStatus(name string, stats map[string]any) {
statusFields := []any{ statusFields := []any{
"msg", "Pipeline status", "msg", "Pipeline status",
@ -108,18 +108,23 @@ func logPipelineStatus(name string, stats map[string]any) {
logger.Debug(statusFields...) logger.Debug(statusFields...)
} }
// displayPipelineEndpoints logs the configured endpoints for a pipeline // Logs the configured endpoints for a pipeline
func displayPipelineEndpoints(cfg config.PipelineConfig) { func displayPipelineEndpoints(cfg config.PipelineConfig) {
// Display sink endpoints // Display sink endpoints
for i, sinkCfg := range cfg.Sinks { for i, sinkCfg := range cfg.Sinks {
switch sinkCfg.Type { switch sinkCfg.Type {
case "tcp": case "tcp":
if port, ok := sinkCfg.Options["port"].(int64); ok { if port, ok := sinkCfg.Options["port"].(int64); ok {
host := "0.0.0.0" // Get host or default to 0.0.0.0
if h, ok := sinkCfg.Options["host"].(string); ok && h != "" {
host = h
}
logger.Info("msg", "TCP endpoint configured", logger.Info("msg", "TCP endpoint configured",
"component", "main", "component", "main",
"pipeline", cfg.Name, "pipeline", cfg.Name,
"sink_index", i, "sink_index", i,
"port", port) "listen", fmt.Sprintf("%s:%d", host, port))
// Display net limit info if configured // Display net limit info if configured
if rl, ok := sinkCfg.Options["net_limit"].(map[string]any); ok { if rl, ok := sinkCfg.Options["net_limit"].(map[string]any); ok {
@ -135,6 +140,11 @@ func displayPipelineEndpoints(cfg config.PipelineConfig) {
case "http": case "http":
if port, ok := sinkCfg.Options["port"].(int64); ok { if port, ok := sinkCfg.Options["port"].(int64); ok {
host := "0.0.0.0"
if h, ok := sinkCfg.Options["host"].(string); ok && h != "" {
host = h
}
streamPath := "/transport" streamPath := "/transport"
statusPath := "/status" statusPath := "/status"
if path, ok := sinkCfg.Options["stream_path"].(string); ok { if path, ok := sinkCfg.Options["stream_path"].(string); ok {
@ -147,8 +157,9 @@ func displayPipelineEndpoints(cfg config.PipelineConfig) {
logger.Info("msg", "HTTP endpoints configured", logger.Info("msg", "HTTP endpoints configured",
"pipeline", cfg.Name, "pipeline", cfg.Name,
"sink_index", i, "sink_index", i,
"stream_url", fmt.Sprintf("http://localhost:%d%s", port, streamPath), "listen", fmt.Sprintf("%s:%d", host, port),
"status_url", fmt.Sprintf("http://localhost:%d%s", port, statusPath)) "stream_url", fmt.Sprintf("http://%s:%d%s", host, port, streamPath),
"status_url", fmt.Sprintf("http://%s:%d%s", host, port, statusPath))
// Display net limit info if configured // Display net limit info if configured
if rl, ok := sinkCfg.Options["net_limit"].(map[string]any); ok { if rl, ok := sinkCfg.Options["net_limit"].(map[string]any); ok {
@ -181,6 +192,56 @@ func displayPipelineEndpoints(cfg config.PipelineConfig) {
} }
} }
// Display source endpoints with host support
for i, sourceCfg := range cfg.Sources {
switch sourceCfg.Type {
case "http":
if port, ok := sourceCfg.Options["port"].(int64); ok {
host := "0.0.0.0"
if h, ok := sourceCfg.Options["host"].(string); ok && h != "" {
host = h
}
displayHost := host
if host == "0.0.0.0" {
displayHost = "localhost"
}
ingestPath := "/ingest"
if path, ok := sourceCfg.Options["ingest_path"].(string); ok {
ingestPath = path
}
logger.Info("msg", "HTTP source configured",
"pipeline", cfg.Name,
"source_index", i,
"listen", fmt.Sprintf("%s:%d", host, port),
"ingest_url", fmt.Sprintf("http://%s:%d%s", displayHost, port, ingestPath))
}
case "tcp":
if port, ok := sourceCfg.Options["port"].(int64); ok {
host := "0.0.0.0"
if h, ok := sourceCfg.Options["host"].(string); ok && h != "" {
host = h
}
displayHost := host
if host == "0.0.0.0" {
displayHost = "localhost"
}
logger.Info("msg", "TCP source configured",
"pipeline", cfg.Name,
"source_index", i,
"listen", fmt.Sprintf("%s:%d", host, port),
"endpoint", fmt.Sprintf("%s:%d", displayHost, port))
}
// TODO: missing other types of source, to be added
}
}
// Display authentication information // Display authentication information
if cfg.Auth != nil && cfg.Auth.Type != "none" { if cfg.Auth != nil && cfg.Auth.Type != "none" {
logger.Info("msg", "Authentication enabled", logger.Info("msg", "Authentication enabled",

View File

@ -50,7 +50,7 @@ func DefaultLogConfig() *LogConfig {
Output: "stderr", Output: "stderr",
Level: "info", Level: "info",
File: &LogFileConfig{ File: &LogFileConfig{
Directory: "./logs", Directory: "./log",
Name: "logwisp", Name: "logwisp",
MaxSizeMB: 100, MaxSizeMB: 100,
MaxTotalSizeMB: 1000, MaxTotalSizeMB: 1000,

View File

@ -108,6 +108,14 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
pipelineName, sourceIndex) pipelineName, sourceIndex)
} }
// Validate host if provided
if host, ok := cfg.Options["host"].(string); ok && host != "" {
if net.ParseIP(host) == nil {
return fmt.Errorf("pipeline '%s' source[%d]: invalid IP address: %s",
pipelineName, sourceIndex, host)
}
}
// Validate path if provided // Validate path if provided
if ingestPath, ok := cfg.Options["ingest_path"].(string); ok { if ingestPath, ok := cfg.Options["ingest_path"].(string); ok {
if !strings.HasPrefix(ingestPath, "/") { if !strings.HasPrefix(ingestPath, "/") {
@ -138,6 +146,14 @@ func validateSource(pipelineName string, sourceIndex int, cfg *SourceConfig) err
pipelineName, sourceIndex) pipelineName, sourceIndex)
} }
// Validate host if provided
if host, ok := cfg.Options["host"].(string); ok && host != "" {
if net.ParseIP(host) == nil {
return fmt.Errorf("pipeline '%s' source[%d]: invalid IP address: %s",
pipelineName, sourceIndex, host)
}
}
// Validate net_limit if present within Options // Validate net_limit if present within Options
if rl, ok := cfg.Options["net_limit"].(map[string]any); ok { if rl, ok := cfg.Options["net_limit"].(map[string]any); ok {
if err := validateNetLimitOptions("TCP source", pipelineName, sourceIndex, rl); err != nil { if err := validateNetLimitOptions("TCP source", pipelineName, sourceIndex, rl); err != nil {
@ -174,6 +190,14 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
pipelineName, sinkIndex) pipelineName, sinkIndex)
} }
// Validate host if provided
if host, ok := cfg.Options["host"].(string); ok && host != "" {
if net.ParseIP(host) == nil {
return fmt.Errorf("pipeline '%s' sink[%d]: invalid IP address: %s",
pipelineName, sinkIndex, host)
}
}
// Check port conflicts // Check port conflicts
if existing, exists := allPorts[port]; exists { if existing, exists := allPorts[port]; exists {
return fmt.Errorf("pipeline '%s' sink[%d]: HTTP port %d already used by %s", return fmt.Errorf("pipeline '%s' sink[%d]: HTTP port %d already used by %s",
@ -233,6 +257,14 @@ func validateSink(pipelineName string, sinkIndex int, cfg *SinkConfig, allPorts
pipelineName, sinkIndex) pipelineName, sinkIndex)
} }
// Validate host if provided
if host, ok := cfg.Options["host"].(string); ok && host != "" {
if net.ParseIP(host) == nil {
return fmt.Errorf("pipeline '%s' sink[%d]: invalid IP address: %s",
pipelineName, sinkIndex, host)
}
}
// Check port conflicts // Check port conflicts
if existing, exists := allPorts[port]; exists { if existing, exists := allPorts[port]; exists {
return fmt.Errorf("pipeline '%s' sink[%d]: TCP port %d already used by %s", return fmt.Errorf("pipeline '%s' sink[%d]: TCP port %d already used by %s",

View File

@ -37,6 +37,12 @@ type HTTPSink struct {
logger *log.Logger logger *log.Logger
formatter format.Formatter formatter format.Formatter
// Broker architecture
clients map[uint64]chan core.LogEntry
clientsMu sync.RWMutex
unregister chan uint64
nextClientID atomic.Uint64
// Security components // Security components
authenticator *auth.Authenticator authenticator *auth.Authenticator
tlsManager *tls.Manager tlsManager *tls.Manager
@ -58,6 +64,7 @@ type HTTPSink struct {
// HTTPConfig holds HTTP sink configuration // HTTPConfig holds HTTP sink configuration
type HTTPConfig struct { type HTTPConfig struct {
Host string
Port int64 Port int64
BufferSize int64 BufferSize int64
StreamPath string StreamPath string
@ -70,6 +77,7 @@ type HTTPConfig struct {
// NewHTTPSink creates a new HTTP streaming sink // NewHTTPSink creates a new HTTP streaming sink
func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) { func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) {
cfg := HTTPConfig{ cfg := HTTPConfig{
Host: "0.0.0.0",
Port: 8080, Port: 8080,
BufferSize: 1000, BufferSize: 1000,
StreamPath: "/transport", StreamPath: "/transport",
@ -77,6 +85,9 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
} }
// Extract configuration from options // Extract configuration from options
if host, ok := options["host"].(string); ok && host != "" {
cfg.Host = host
}
if port, ok := options["port"].(int64); ok { if port, ok := options["port"].(int64); ok {
cfg.Port = port cfg.Port = port
} }
@ -182,9 +193,20 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
statusPath: cfg.StatusPath, statusPath: cfg.StatusPath,
logger: logger, logger: logger,
formatter: formatter, formatter: formatter,
clients: make(map[uint64]chan core.LogEntry),
unregister: make(chan uint64, 10), // Buffered for non-blocking
} }
h.lastProcessed.Store(time.Time{}) h.lastProcessed.Store(time.Time{})
// Initialize TLS manager
if cfg.TLS != nil && cfg.TLS.Enabled {
tlsManager, err := tls.NewManager(cfg.TLS, logger)
if err != nil {
return nil, fmt.Errorf("failed to create TLS manager: %w", err)
}
h.tlsManager = tlsManager
}
// Initialize net limiter if configured // Initialize net limiter if configured
if cfg.NetLimit != nil && cfg.NetLimit.Enabled { if cfg.NetLimit != nil && cfg.NetLimit.Enabled {
h.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger) h.netLimiter = limit.NewNetLimiter(*cfg.NetLimit, logger)
@ -198,6 +220,10 @@ func (h *HTTPSink) Input() chan<- core.LogEntry {
} }
func (h *HTTPSink) Start(ctx context.Context) error { func (h *HTTPSink) Start(ctx context.Context) error {
// Start central broker goroutine
h.wg.Add(1)
go h.brokerLoop(ctx)
// Create fasthttp adapter for logging // Create fasthttp adapter for logging
fasthttpLogger := compat.NewFastHTTPAdapter(h.logger) fasthttpLogger := compat.NewFastHTTPAdapter(h.logger)
@ -216,13 +242,15 @@ func (h *HTTPSink) Start(ctx context.Context) error {
"port", h.config.Port) "port", h.config.Port)
} }
addr := fmt.Sprintf(":%d", h.config.Port) // Use configured host and port
addr := fmt.Sprintf("%s:%d", h.config.Host, h.config.Port)
// Run server in separate goroutine to avoid blocking // Run server in separate goroutine to avoid blocking
errChan := make(chan error, 1) errChan := make(chan error, 1)
go func() { go func() {
h.logger.Info("msg", "HTTP server started", h.logger.Info("msg", "HTTP server started",
"component", "http_sink", "component", "http_sink",
"host", h.config.Host,
"port", h.config.Port, "port", h.config.Port,
"stream_path", h.streamPath, "stream_path", h.streamPath,
"status_path", h.statusPath, "status_path", h.statusPath,
@ -262,6 +290,99 @@ func (h *HTTPSink) Start(ctx context.Context) error {
} }
} }
// Broadcasts only to active clients
func (h *HTTPSink) brokerLoop(ctx context.Context) {
defer h.wg.Done()
var ticker *time.Ticker
var tickerChan <-chan time.Time
if h.config.Heartbeat != nil && h.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
h.logger.Debug("msg", "Broker loop stopping due to context cancellation",
"component", "http_sink")
return
case <-h.done:
h.logger.Debug("msg", "Broker loop stopping due to shutdown signal",
"component", "http_sink")
return
case clientID := <-h.unregister:
// Broker owns channel cleanup
h.clientsMu.Lock()
if clientChan, exists := h.clients[clientID]; exists {
delete(h.clients, clientID)
close(clientChan)
h.logger.Debug("msg", "Unregistered client",
"component", "http_sink",
"client_id", clientID)
}
h.clientsMu.Unlock()
case entry, ok := <-h.input:
if !ok {
h.logger.Debug("msg", "Input channel closed, broker stopping",
"component", "http_sink")
return
}
h.totalProcessed.Add(1)
h.lastProcessed.Store(time.Now())
// Broadcast to all active clients
h.clientsMu.RLock()
clientCount := len(h.clients)
if clientCount > 0 {
slowClients := 0
for id, ch := range h.clients {
select {
case ch <- entry:
// Successfully sent
default:
// Client buffer full
slowClients++
if slowClients == 1 { // Log only once per broadcast
h.logger.Debug("msg", "Dropped entry for slow client(s)",
"component", "http_sink",
"client_id", id,
"slow_clients", slowClients,
"total_clients", clientCount)
}
}
}
}
// If no clients connected, entry is discarded (no buffering)
h.clientsMu.RUnlock()
case <-tickerChan:
// Send global heartbeat to all clients
if h.config.Heartbeat != nil && h.config.Heartbeat.Enabled {
heartbeatEntry := h.createHeartbeatEntry()
h.clientsMu.RLock()
for id, ch := range h.clients {
select {
case ch <- heartbeatEntry:
default:
// Client buffer full, skip heartbeat
h.logger.Debug("msg", "Skipped heartbeat for slow client",
"component", "http_sink",
"client_id", id)
}
}
h.clientsMu.RUnlock()
}
}
}
}
func (h *HTTPSink) Stop() { func (h *HTTPSink) Stop() {
h.logger.Info("msg", "Stopping HTTP sink") h.logger.Info("msg", "Stopping HTTP sink")
@ -278,6 +399,17 @@ func (h *HTTPSink) Stop() {
// Wait for all active client handlers to finish // Wait for all active client handlers to finish
h.wg.Wait() h.wg.Wait()
// Close unregister channel after all clients have finished
close(h.unregister)
// Close all client channels
h.clientsMu.Lock()
for _, ch := range h.clients {
close(ch)
}
h.clients = make(map[uint64]chan core.LogEntry)
h.clientsMu.Unlock()
h.logger.Info("msg", "HTTP sink stopped") h.logger.Info("msg", "HTTP sink stopped")
} }
@ -381,6 +513,15 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
return return
} }
h.authSuccesses.Add(1) h.authSuccesses.Add(1)
} else {
// Create anonymous session for unauthenticated connections
session = &auth.Session{
ID: fmt.Sprintf("anon-%d", time.Now().UnixNano()),
Username: "anonymous",
Method: "none",
RemoteAddr: remoteAddr,
CreatedAt: time.Now(),
}
} }
switch path { switch path {
@ -404,72 +545,57 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
} }
// Set SSE headers // Set SSE headers
ctx.Response.Header.Set("Content-Type", "text/event-transport") ctx.Response.Header.Set("Content-Type", "text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache") ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive") ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
ctx.Response.Header.Set("X-Accel-Buffering", "no") ctx.Response.Header.Set("X-Accel-Buffering", "no")
// Create subscription for this client // Register new client with broker
clientID := h.nextClientID.Add(1)
clientChan := make(chan core.LogEntry, h.config.BufferSize) clientChan := make(chan core.LogEntry, h.config.BufferSize)
clientDone := make(chan struct{})
// Subscribe to input channel h.clientsMu.Lock()
go func() { h.clients[clientID] = clientChan
defer close(clientChan) h.clientsMu.Unlock()
for {
select {
case entry, ok := <-h.input:
if !ok {
return
}
h.totalProcessed.Add(1)
h.lastProcessed.Store(time.Now())
select { // Define the stream writer function
case clientChan <- entry:
case <-clientDone:
return
case <-h.done:
return
default:
// Drop if client buffer full
h.logger.Debug("msg", "Dropped entry for slow client",
"component", "http_sink",
"remote_addr", remoteAddr)
}
case <-clientDone:
return
case <-h.done:
return
}
}
}()
// Define the transport writer function
streamFunc := func(w *bufio.Writer) { streamFunc := func(w *bufio.Writer) {
newCount := h.activeClients.Add(1) connectCount := h.activeClients.Add(1)
h.logger.Debug("msg", "HTTP client connected", h.logger.Debug("msg", "HTTP client connected",
"component", "http_sink",
"remote_addr", remoteAddr, "remote_addr", remoteAddr,
"username", session.Username, "username", session.Username,
"auth_method", session.Method, "auth_method", session.Method,
"active_clients", newCount) "client_id", clientID,
"active_clients", connectCount)
// Track goroutine lifecycle with waitgroup
h.wg.Add(1) h.wg.Add(1)
// Cleanup signals unregister
defer func() { defer func() {
close(clientDone) disconnectCount := h.activeClients.Add(-1)
newCount := h.activeClients.Add(-1)
h.logger.Debug("msg", "HTTP client disconnected", h.logger.Debug("msg", "HTTP client disconnected",
"component", "http_sink",
"remote_addr", remoteAddr, "remote_addr", remoteAddr,
"username", session.Username, "username", session.Username,
"active_clients", newCount) "client_id", clientID,
"active_clients", disconnectCount)
// Signal broker to cleanup this client's channel
select {
case h.unregister <- clientID:
case <-h.done:
// Shutting down, don't block
}
h.wg.Done() h.wg.Done()
}() }()
// Send initial connected event // Send initial connected event with metadata
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
connectionInfo := map[string]any{ connectionInfo := map[string]any{
"client_id": clientID, "client_id": fmt.Sprintf("%d", clientID),
"username": session.Username, "username": session.Username,
"auth_method": session.Method, "auth_method": session.Method,
"stream_path": h.streamPath, "stream_path": h.streamPath,
@ -479,27 +605,33 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
} }
data, _ := json.Marshal(connectionInfo) data, _ := json.Marshal(connectionInfo)
fmt.Fprintf(w, "event: connected\ndata: %s\n\n", data) fmt.Fprintf(w, "event: connected\ndata: %s\n\n", data)
w.Flush() if err := w.Flush(); err != nil {
return
}
// Setup heartbeat ticker if enabled
var ticker *time.Ticker var ticker *time.Ticker
var tickerChan <-chan time.Time var tickerChan <-chan time.Time
if h.config.Heartbeat.Enabled { if h.config.Heartbeat != nil && h.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second) ticker = time.NewTicker(time.Duration(h.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C tickerChan = ticker.C
defer ticker.Stop() defer ticker.Stop()
} }
// Main streaming loop
for { for {
select { select {
case entry, ok := <-clientChan: case entry, ok := <-clientChan:
if !ok { if !ok {
// Channel closed, client being removed
return return
} }
if err := h.formatEntryForSSE(w, entry); err != nil { if err := h.formatEntryForSSE(w, entry); err != nil {
h.logger.Error("msg", "Failed to format log entry", h.logger.Error("msg", "Failed to format log entry",
"component", "http_sink", "component", "http_sink",
"client_id", clientID,
"error", err, "error", err,
"entry_source", entry.Source) "entry_source", entry.Source)
continue continue
@ -512,18 +644,22 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
case <-tickerChan: case <-tickerChan:
// Validate session is still active // Validate session is still active
if h.authenticator != nil && !h.authenticator.ValidateSession(session.ID) { if h.authenticator != nil && session != nil && !h.authenticator.ValidateSession(session.ID) {
fmt.Fprintf(w, "event: disconnect\ndata: {\"reason\":\"session_expired\"}\n\n") fmt.Fprintf(w, "event: disconnect\ndata: {\"reason\":\"session_expired\"}\n\n")
w.Flush() w.Flush()
return return
} }
heartbeatEntry := h.createHeartbeatEntry() // Heartbeat is sent from broker, additional client-specific heartbeat is sent here
if err := h.formatEntryForSSE(w, heartbeatEntry); err != nil { // This provides per-client heartbeat validation with session check
h.logger.Error("msg", "Failed to format heartbeat", sessionHB := map[string]any{
"component", "http_sink", "type": "session_heartbeat",
"error", err) "client_id": fmt.Sprintf("%d", clientID),
"session_valid": true,
} }
hbData, _ := json.Marshal(sessionHB)
fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", hbData)
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
return return
} }
@ -650,21 +786,26 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
ctx.SetBody(data) ctx.SetBody(data)
} }
// GetActiveConnections returns the current number of active clients // Returns the current number of active clients
func (h *HTTPSink) GetActiveConnections() int64 { func (h *HTTPSink) GetActiveConnections() int64 {
return h.activeClients.Load() return h.activeClients.Load()
} }
// GetStreamPath returns the configured transport endpoint path // Returns the configured transport endpoint path
func (h *HTTPSink) GetStreamPath() string { func (h *HTTPSink) GetStreamPath() string {
return h.streamPath return h.streamPath
} }
// GetStatusPath returns the configured status endpoint path // Returns the configured status endpoint path
func (h *HTTPSink) GetStatusPath() string { func (h *HTTPSink) GetStatusPath() string {
return h.statusPath return h.statusPath
} }
// Returns the configured host
func (h *HTTPSink) GetHost() string {
return h.config.Host
}
// SetAuthConfig configures http sink authentication // SetAuthConfig configures http sink authentication
func (h *HTTPSink) SetAuthConfig(authCfg *config.AuthConfig) { func (h *HTTPSink) SetAuthConfig(authCfg *config.AuthConfig) {
if authCfg == nil || authCfg.Type == "none" { if authCfg == nil || authCfg.Type == "none" {

View File

@ -58,6 +58,7 @@ type TCPSink struct {
// TCPConfig holds TCP sink configuration // TCPConfig holds TCP sink configuration
type TCPConfig struct { type TCPConfig struct {
Host string
Port int64 Port int64
BufferSize int64 BufferSize int64
Heartbeat *config.HeartbeatConfig Heartbeat *config.HeartbeatConfig
@ -68,11 +69,15 @@ type TCPConfig struct {
// NewTCPSink creates a new TCP streaming sink // NewTCPSink creates a new TCP streaming sink
func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) { func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) {
cfg := TCPConfig{ cfg := TCPConfig{
Host: "0.0.0.0",
Port: int64(9090), Port: int64(9090),
BufferSize: int64(1000), BufferSize: int64(1000),
} }
// Extract configuration from options // Extract configuration from options
if host, ok := options["host"].(string); ok && host != "" {
cfg.Host = host
}
if port, ok := options["port"].(int64); ok { if port, ok := options["port"].(int64); ok {
cfg.Port = port cfg.Port = port
} }
@ -199,7 +204,7 @@ func (t *TCPSink) Start(ctx context.Context) error {
}() }()
// Configure gnet options // Configure gnet options
addr := fmt.Sprintf("tcp://:%d", t.config.Port) addr := fmt.Sprintf("tcp://%s:%d", t.config.Host, t.config.Port)
// Create a gnet adapter using the existing logger instance // Create a gnet adapter using the existing logger instance
gnetLogger := compat.NewGnetAdapter(t.logger) gnetLogger := compat.NewGnetAdapter(t.logger)
@ -317,7 +322,7 @@ func (t *TCPSink) broadcastLoop(ctx context.Context) {
var ticker *time.Ticker var ticker *time.Ticker
var tickerChan <-chan time.Time var tickerChan <-chan time.Time
if t.config.Heartbeat.Enabled { if t.config.Heartbeat != nil && t.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(t.config.Heartbeat.IntervalSeconds) * time.Second) ticker = time.NewTicker(time.Duration(t.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C tickerChan = ticker.C
defer ticker.Stop() defer ticker.Stop()

View File

@ -20,6 +20,7 @@ import (
// HTTPSource receives log entries via HTTP POST requests // HTTPSource receives log entries via HTTP POST requests
type HTTPSource struct { type HTTPSource struct {
host string
port int64 port int64
ingestPath string ingestPath string
bufferSize int64 bufferSize int64
@ -45,6 +46,11 @@ type HTTPSource struct {
// NewHTTPSource creates a new HTTP server source // NewHTTPSource creates a new HTTP server source
func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, error) { func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, error) {
host := "0.0.0.0"
if h, ok := options["host"].(string); ok && h != "" {
host = h
}
port, ok := options["port"].(int64) port, ok := options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return nil, fmt.Errorf("http source requires valid 'port' option") return nil, fmt.Errorf("http source requires valid 'port' option")
@ -61,6 +67,7 @@ func NewHTTPSource(options map[string]any, logger *log.Logger) (*HTTPSource, err
} }
h := &HTTPSource{ h := &HTTPSource{
host: host,
port: port, port: port,
ingestPath: ingestPath, ingestPath: ingestPath,
bufferSize: bufferSize, bufferSize: bufferSize,
@ -156,7 +163,8 @@ func (h *HTTPSource) Start() error {
CloseOnShutdown: true, CloseOnShutdown: true,
} }
addr := fmt.Sprintf(":%d", h.port) // Use configured host and port
addr := fmt.Sprintf("%s:%d", h.host, h.port)
// Start server in background // Start server in background
h.wg.Add(1) h.wg.Add(1)

View File

@ -32,6 +32,7 @@ const (
// TCPSource receives log entries via TCP connections // TCPSource receives log entries via TCP connections
type TCPSource struct { type TCPSource struct {
host string
port int64 port int64
bufferSize int64 bufferSize int64
server *tcpSourceServer server *tcpSourceServer
@ -57,6 +58,11 @@ type TCPSource struct {
// NewTCPSource creates a new TCP server source // NewTCPSource creates a new TCP server source
func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error) { func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error) {
host := "0.0.0.0"
if h, ok := options["host"].(string); ok && h != "" {
host = h
}
port, ok := options["port"].(int64) port, ok := options["port"].(int64)
if !ok || port < 1 || port > 65535 { if !ok || port < 1 || port > 65535 {
return nil, fmt.Errorf("tcp source requires valid 'port' option") return nil, fmt.Errorf("tcp source requires valid 'port' option")
@ -68,6 +74,7 @@ func NewTCPSource(options map[string]any, logger *log.Logger) (*TCPSource, error
} }
t := &TCPSource{ t := &TCPSource{
host: host,
port: port, port: port,
bufferSize: bufferSize, bufferSize: bufferSize,
done: make(chan struct{}), done: make(chan struct{}),
@ -147,7 +154,8 @@ func (t *TCPSource) Start() error {
clients: make(map[gnet.Conn]*tcpClient), clients: make(map[gnet.Conn]*tcpClient),
} }
addr := fmt.Sprintf("tcp://:%d", t.port) // Use configured host and port
addr := fmt.Sprintf("tcp://%s:%d", t.host, t.port)
// Create a gnet adapter using the existing logger instance // Create a gnet adapter using the existing logger instance
gnetLogger := compat.NewGnetAdapter(t.logger) gnetLogger := compat.NewGnetAdapter(t.logger)