v0.8.0 decoupled session management and auth, auth deprecated except mtls, session management, tls and mtls flows fixed, docs and config outdated

This commit is contained in:
2025-11-06 16:43:46 -05:00
parent 7e542b660a
commit dcf803bac1
55 changed files with 2894 additions and 3807 deletions

View File

@ -16,7 +16,7 @@ import (
"github.com/lixenwraith/log"
)
// ConsoleSink writes log entries to the console (stdout/stderr) using an dedicated logger instance
// ConsoleSink writes log entries to the console (stdout/stderr) using an dedicated logger instance.
type ConsoleSink struct {
config *config.ConsoleSinkOptions
input chan core.LogEntry
@ -31,7 +31,7 @@ type ConsoleSink struct {
lastProcessed atomic.Value // time.Time
}
// Creates a new console sink
// NewConsoleSink creates a new console sink.
func NewConsoleSink(opts *config.ConsoleSinkOptions, appLogger *log.Logger, formatter format.Formatter) (*ConsoleSink, error) {
if opts == nil {
return nil, fmt.Errorf("console sink options cannot be nil")
@ -73,10 +73,12 @@ func NewConsoleSink(opts *config.ConsoleSinkOptions, appLogger *log.Logger, form
return s, nil
}
// Input returns the channel for sending log entries.
func (s *ConsoleSink) Input() chan<- core.LogEntry {
return s.input
}
// Start begins the processing loop for the sink.
func (s *ConsoleSink) Start(ctx context.Context) error {
// Start the internal writer's processing goroutine.
if err := s.writer.Start(); err != nil {
@ -89,6 +91,7 @@ func (s *ConsoleSink) Start(ctx context.Context) error {
return nil
}
// Stop gracefully shuts down the sink.
func (s *ConsoleSink) Stop() {
target := s.writer.GetConfig().ConsoleTarget
s.logger.Info("msg", "Stopping console sink", "target", target)
@ -103,6 +106,7 @@ func (s *ConsoleSink) Stop() {
s.logger.Info("msg", "Console sink stopped", "target", target)
}
// GetStats returns the sink's statistics.
func (s *ConsoleSink) GetStats() SinkStats {
lastProc, _ := s.lastProcessed.Load().(time.Time)
@ -117,7 +121,7 @@ func (s *ConsoleSink) GetStats() SinkStats {
}
}
// processLoop reads entries, formats them, and passes them to the internal writer.
// processLoop reads entries, formats them, and writes to the console.
func (s *ConsoleSink) processLoop(ctx context.Context) {
for {
select {

View File

@ -15,7 +15,7 @@ import (
"github.com/lixenwraith/log"
)
// Writes log entries to files with rotation
// FileSink writes log entries to files with rotation.
type FileSink struct {
config *config.FileSinkOptions
input chan core.LogEntry
@ -30,7 +30,7 @@ type FileSink struct {
lastProcessed atomic.Value // time.Time
}
// Creates a new file sink
// NewFileSink creates a new file sink.
func NewFileSink(opts *config.FileSinkOptions, logger *log.Logger, formatter format.Formatter) (*FileSink, error) {
if opts == nil {
return nil, fmt.Errorf("file sink options cannot be nil")
@ -63,10 +63,12 @@ func NewFileSink(opts *config.FileSinkOptions, logger *log.Logger, formatter for
return fs, nil
}
// Input returns the channel for sending log entries.
func (fs *FileSink) Input() chan<- core.LogEntry {
return fs.input
}
// Start begins the processing loop for the sink.
func (fs *FileSink) Start(ctx context.Context) error {
// Start the internal file writer
if err := fs.writer.Start(); err != nil {
@ -78,6 +80,7 @@ func (fs *FileSink) Start(ctx context.Context) error {
return nil
}
// Stop gracefully shuts down the sink.
func (fs *FileSink) Stop() {
fs.logger.Info("msg", "Stopping file sink")
close(fs.done)
@ -92,6 +95,7 @@ func (fs *FileSink) Stop() {
fs.logger.Info("msg", "File sink stopped")
}
// GetStats returns the sink's statistics.
func (fs *FileSink) GetStats() SinkStats {
lastProc, _ := fs.lastProcessed.Load().(time.Time)
@ -104,6 +108,7 @@ func (fs *FileSink) GetStats() SinkStats {
}
}
// processLoop reads entries, formats them, and writes to a file.
func (fs *FileSink) processLoop(ctx context.Context) {
for {
select {

View File

@ -5,18 +5,19 @@ import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/auth"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/limit"
"logwisp/src/internal/tls"
"logwisp/src/internal/session"
ltls "logwisp/src/internal/tls"
"logwisp/src/internal/version"
"github.com/lixenwraith/log"
@ -24,7 +25,7 @@ import (
"github.com/valyala/fasthttp"
)
// Streams log entries via Server-Sent Events
// HTTPSink streams log entries via Server-Sent Events (SSE).
type HTTPSink struct {
// Configuration reference (NOT a copy)
config *config.HTTPSinkOptions
@ -46,10 +47,11 @@ type HTTPSink struct {
unregister chan uint64
nextClientID atomic.Uint64
// Security components
authenticator *auth.Authenticator
tlsManager *tls.Manager
authConfig *config.ServerAuthConfig
// Session and security
sessionManager *session.Manager
clientSessions map[uint64]string // clientID -> sessionID
sessionsMu sync.RWMutex
tlsManager *ltls.ServerManager
// Net limiting
netLimiter *limit.NetLimiter
@ -57,31 +59,32 @@ type HTTPSink struct {
// Statistics
totalProcessed atomic.Uint64
lastProcessed atomic.Value // time.Time
authFailures atomic.Uint64
authSuccesses atomic.Uint64
}
// Creates a new HTTP streaming sink
// NewHTTPSink creates a new HTTP streaming sink.
func NewHTTPSink(opts *config.HTTPSinkOptions, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) {
if opts == nil {
return nil, fmt.Errorf("HTTP sink options cannot be nil")
}
h := &HTTPSink{
config: opts, // Direct reference to config struct
input: make(chan core.LogEntry, opts.BufferSize),
startTime: time.Now(),
done: make(chan struct{}),
logger: logger,
formatter: formatter,
clients: make(map[uint64]chan core.LogEntry),
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
startTime: time.Now(),
done: make(chan struct{}),
logger: logger,
formatter: formatter,
clients: make(map[uint64]chan core.LogEntry),
unregister: make(chan uint64),
sessionManager: session.NewManager(30 * time.Minute),
clientSessions: make(map[uint64]string),
}
h.lastProcessed.Store(time.Time{})
// Initialize TLS manager if configured
if opts.TLS != nil && opts.TLS.Enabled {
tlsManager, err := tls.NewManager(opts.TLS, logger)
tlsManager, err := ltls.NewServerManager(opts.TLS, logger)
if err != nil {
return nil, fmt.Errorf("failed to create TLS manager: %w", err)
}
@ -97,32 +100,21 @@ func NewHTTPSink(opts *config.HTTPSinkOptions, logger *log.Logger, formatter for
h.netLimiter = limit.NewNetLimiter(opts.NetLimit, logger)
}
// Initialize authenticator if auth is not "none"
if opts.Auth != nil && opts.Auth.Type != "none" {
// Only "basic" and "token" are valid for HTTP sink
if opts.Auth.Type != "basic" && opts.Auth.Type != "token" {
return nil, fmt.Errorf("invalid auth type '%s' for HTTP sink (valid: none, basic, token)", opts.Auth.Type)
}
authenticator, err := auth.NewAuthenticator(opts.Auth, logger)
if err != nil {
return nil, fmt.Errorf("failed to create authenticator: %w", err)
}
h.authenticator = authenticator
h.authConfig = opts.Auth
logger.Info("msg", "Authentication enabled",
"component", "http_sink",
"type", opts.Auth.Type)
}
return h, nil
}
// Input returns the channel for sending log entries.
func (h *HTTPSink) Input() chan<- core.LogEntry {
return h.input
}
// Start initializes the HTTP server and begins the broker loop.
func (h *HTTPSink) Start(ctx context.Context) error {
// Register expiry callback
h.sessionManager.RegisterExpiryCallback("http_sink", func(sessionID, remoteAddr string) {
h.handleSessionExpiry(sessionID, remoteAddr)
})
// Start central broker goroutine
h.wg.Add(1)
go h.brokerLoop(ctx)
@ -144,6 +136,16 @@ func (h *HTTPSink) Start(ctx context.Context) error {
// Configure TLS if enabled
if h.tlsManager != nil {
h.server.TLSConfig = h.tlsManager.GetHTTPConfig()
// Enforce mTLS configuration
if h.config.TLS.ClientAuth {
if h.config.TLS.VerifyClientCert {
h.server.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
} else {
h.server.TLSConfig.ClientAuth = tls.RequireAnyClientCert
}
}
h.logger.Info("msg", "TLS enabled for HTTP sink",
"component", "http_sink",
"port", h.config.Port)
@ -183,7 +185,7 @@ func (h *HTTPSink) Start(ctx context.Context) error {
if h.server != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
h.server.ShutdownWithContext(shutdownCtx)
_ = h.server.ShutdownWithContext(shutdownCtx)
}
}()
@ -197,7 +199,105 @@ func (h *HTTPSink) Start(ctx context.Context) error {
}
}
// Broadcasts only to active clients
// Stop gracefully shuts down the HTTP server and all client connections.
func (h *HTTPSink) Stop() {
h.logger.Info("msg", "Stopping HTTP sink")
// Unregister callback
h.sessionManager.UnregisterExpiryCallback("http_sink")
// Signal all client handlers to stop
close(h.done)
// Shutdown HTTP server
if h.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = h.server.ShutdownWithContext(ctx)
}
// Wait for all active client handlers to finish
h.wg.Wait()
// Close unregister channel after all clients have finished
close(h.unregister)
// Close all client channels
h.clientsMu.Lock()
for _, ch := range h.clients {
close(ch)
}
h.clients = make(map[uint64]chan core.LogEntry)
h.clientsMu.Unlock()
// Stop session manager
if h.sessionManager != nil {
h.sessionManager.Stop()
}
h.logger.Info("msg", "HTTP sink stopped")
}
// GetStats returns the sink's statistics.
func (h *HTTPSink) GetStats() SinkStats {
lastProc, _ := h.lastProcessed.Load().(time.Time)
var netLimitStats map[string]any
if h.netLimiter != nil {
netLimitStats = h.netLimiter.GetStats()
}
var sessionStats map[string]any
if h.sessionManager != nil {
sessionStats = h.sessionManager.GetStats()
}
var tlsStats map[string]any
if h.tlsManager != nil {
tlsStats = h.tlsManager.GetStats()
}
return SinkStats{
Type: "http",
TotalProcessed: h.totalProcessed.Load(),
ActiveConnections: h.activeClients.Load(),
StartTime: h.startTime,
LastProcessed: lastProc,
Details: map[string]any{
"port": h.config.Port,
"buffer_size": h.config.BufferSize,
"endpoints": map[string]string{
"stream": h.config.StreamPath,
"status": h.config.StatusPath,
},
"net_limit": netLimitStats,
"sessions": sessionStats,
"tls": tlsStats,
},
}
}
// GetActiveConnections returns the current number of active clients.
func (h *HTTPSink) GetActiveConnections() int64 {
return h.activeClients.Load()
}
// GetStreamPath returns the configured transport endpoint path.
func (h *HTTPSink) GetStreamPath() string {
return h.config.StreamPath
}
// GetStatusPath returns the configured status endpoint path.
func (h *HTTPSink) GetStatusPath() string {
return h.config.StatusPath
}
// GetHost returns the configured host.
func (h *HTTPSink) GetHost() string {
return h.config.Host
}
// brokerLoop manages client connections and broadcasts log entries.
func (h *HTTPSink) brokerLoop(ctx context.Context) {
defer h.wg.Done()
@ -233,6 +333,11 @@ func (h *HTTPSink) brokerLoop(ctx context.Context) {
}
h.clientsMu.Unlock()
// Clean up session tracking
h.sessionsMu.Lock()
delete(h.clientSessions, clientID)
h.sessionsMu.Unlock()
case entry, ok := <-h.input:
if !ok {
h.logger.Debug("msg", "Input channel closed, broker stopping",
@ -248,23 +353,50 @@ func (h *HTTPSink) brokerLoop(ctx context.Context) {
clientCount := len(h.clients)
if clientCount > 0 {
slowClients := 0
var staleClients []uint64
for id, ch := range h.clients {
select {
case ch <- entry:
// Successfully sent
default:
// Client buffer full
slowClients++
if slowClients == 1 { // Log only once per broadcast
h.logger.Debug("msg", "Dropped entry for slow client(s)",
"component", "http_sink",
"client_id", id,
"slow_clients", slowClients,
"total_clients", clientCount)
h.sessionsMu.RLock()
sessionID, hasSession := h.clientSessions[id]
h.sessionsMu.RUnlock()
if hasSession {
if !h.sessionManager.IsSessionActive(sessionID) {
staleClients = append(staleClients, id)
continue
}
select {
case ch <- entry:
h.sessionManager.UpdateActivity(sessionID)
default:
slowClients++
if slowClients == 1 {
h.logger.Debug("msg", "Dropped entry for slow client(s)",
"component", "http_sink",
"client_id", id,
"slow_clients", slowClients,
"total_clients", clientCount)
}
}
} else {
delete(h.clients, id)
}
}
// Clean up stale clients after broadcast
if len(staleClients) > 0 {
go func() {
for _, clientID := range staleClients {
select {
case h.unregister <- clientID:
case <-h.done:
return
}
}
}()
}
}
// If no clients connected, entry is discarded (no buffering)
h.clientsMu.RUnlock()
@ -275,91 +407,29 @@ func (h *HTTPSink) brokerLoop(ctx context.Context) {
h.clientsMu.RLock()
for id, ch := range h.clients {
select {
case ch <- heartbeatEntry:
default:
// Client buffer full, skip heartbeat
h.logger.Debug("msg", "Skipped heartbeat for slow client",
"component", "http_sink",
"client_id", id)
h.sessionsMu.RLock()
sessionID, hasSession := h.clientSessions[id]
h.sessionsMu.RUnlock()
if hasSession {
select {
case ch <- heartbeatEntry:
// Update session activity on heartbeat
h.sessionManager.UpdateActivity(sessionID)
default:
// Client buffer full, skip heartbeat
h.logger.Debug("msg", "Skipped heartbeat for slow client",
"component", "http_sink",
"client_id", id)
}
}
}
h.clientsMu.RUnlock()
}
}
}
}
func (h *HTTPSink) Stop() {
h.logger.Info("msg", "Stopping HTTP sink")
// Signal all client handlers to stop
close(h.done)
// Shutdown HTTP server
if h.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
h.server.ShutdownWithContext(ctx)
}
// Wait for all active client handlers to finish
h.wg.Wait()
// Close unregister channel after all clients have finished
close(h.unregister)
// Close all client channels
h.clientsMu.Lock()
for _, ch := range h.clients {
close(ch)
}
h.clients = make(map[uint64]chan core.LogEntry)
h.clientsMu.Unlock()
h.logger.Info("msg", "HTTP sink stopped")
}
func (h *HTTPSink) GetStats() SinkStats {
lastProc, _ := h.lastProcessed.Load().(time.Time)
var netLimitStats map[string]any
if h.netLimiter != nil {
netLimitStats = h.netLimiter.GetStats()
}
var authStats map[string]any
if h.authenticator != nil {
authStats = h.authenticator.GetStats()
authStats["failures"] = h.authFailures.Load()
authStats["successes"] = h.authSuccesses.Load()
}
var tlsStats map[string]any
if h.tlsManager != nil {
tlsStats = h.tlsManager.GetStats()
}
return SinkStats{
Type: "http",
TotalProcessed: h.totalProcessed.Load(),
ActiveConnections: h.activeClients.Load(),
StartTime: h.startTime,
LastProcessed: lastProc,
Details: map[string]any{
"port": h.config.Port,
"buffer_size": h.config.BufferSize,
"endpoints": map[string]string{
"stream": h.config.StreamPath,
"status": h.config.StatusPath,
},
"net_limit": netLimitStats,
"auth": authStats,
"tls": tlsStats,
},
}
}
// requestHandler is the main entry point for all incoming HTTP requests.
func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
remoteAddr := ctx.RemoteAddr().String()
@ -380,21 +450,6 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
}
}
// Enforce TLS for authentication
if h.authenticator != nil && h.authConfig.Type != "none" {
isTLS := ctx.IsTLS() || h.tlsManager != nil
if !isTLS {
ctx.SetStatusCode(fasthttp.StatusForbidden)
ctx.SetContentType("application/json")
json.NewEncoder(ctx).Encode(map[string]string{
"error": "TLS required for authentication",
"hint": "Use HTTPS for authenticated connections",
})
return
}
}
path := string(ctx.Path())
// Status endpoint doesn't require auth
@ -403,52 +458,14 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
return
}
// Authenticate request
var session *auth.Session
if h.authenticator != nil {
authHeader := string(ctx.Request.Header.Peek("Authorization"))
var err error
session, err = h.authenticator.AuthenticateHTTP(authHeader, remoteAddr)
if err != nil {
h.authFailures.Add(1)
h.logger.Warn("msg", "Authentication failed",
"component", "http_sink",
"remote_addr", remoteAddr,
"error", err)
// Return 401 with WWW-Authenticate header
ctx.SetStatusCode(fasthttp.StatusUnauthorized)
if h.authConfig.Type == "basic" && h.authConfig.Basic != nil {
realm := h.authConfig.Basic.Realm
if realm == "" {
realm = "Restricted"
}
ctx.Response.Header.Set("WWW-Authenticate", fmt.Sprintf("Basic realm=\"%s\"", realm))
} else if h.authConfig.Type == "token" {
ctx.Response.Header.Set("WWW-Authenticate", "Token")
}
ctx.SetContentType("application/json")
json.NewEncoder(ctx).Encode(map[string]string{
"error": "Unauthorized",
})
return
}
h.authSuccesses.Add(1)
} else {
// Create anonymous session for unauthenticated connections
session = &auth.Session{
ID: fmt.Sprintf("anon-%d", time.Now().UnixNano()),
Username: "anonymous",
Method: "none",
RemoteAddr: remoteAddr,
CreatedAt: time.Now(),
}
}
// Create anonymous session for all connections
sess := h.sessionManager.CreateSession(remoteAddr, "http_sink", map[string]any{
"tls": ctx.IsTLS() || h.tlsManager != nil,
})
switch path {
case h.config.StreamPath:
h.handleStream(ctx, session)
h.handleStream(ctx, sess)
default:
ctx.SetStatusCode(fasthttp.StatusNotFound)
ctx.SetContentType("application/json")
@ -456,18 +473,11 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
"error": "Not Found",
})
}
// Handle stream endpoint
// if path == h.config.StreamPath {
// h.handleStream(ctx, session)
// return
// }
//
// // Unknown path
// ctx.SetStatusCode(fasthttp.StatusNotFound)
// ctx.SetBody([]byte("Not Found"))
}
func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session) {
// handleStream manages a client's Server-Sent Events (SSE) stream.
func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, sess *session.Session) {
// Track connection for net limiting
remoteAddr := ctx.RemoteAddr().String()
if h.netLimiter != nil {
@ -490,14 +500,18 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
h.clients[clientID] = clientChan
h.clientsMu.Unlock()
// Register session mapping
h.sessionsMu.Lock()
h.clientSessions[clientID] = sess.ID
h.sessionsMu.Unlock()
// Define the stream writer function
streamFunc := func(w *bufio.Writer) {
connectCount := h.activeClients.Add(1)
h.logger.Debug("msg", "HTTP client connected",
"component", "http_sink",
"remote_addr", remoteAddr,
"username", session.Username,
"auth_method", session.Method,
"session_id", sess.ID,
"client_id", clientID,
"active_clients", connectCount)
@ -510,7 +524,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
h.logger.Debug("msg", "HTTP client disconnected",
"component", "http_sink",
"remote_addr", remoteAddr,
"username", session.Username,
"session_id", sess.ID,
"client_id", clientID,
"active_clients", disconnectCount)
@ -521,14 +535,16 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
// Shutting down, don't block
}
// Remove session
h.sessionManager.RemoveSession(sess.ID)
h.wg.Done()
}()
// Send initial connected event with metadata
connectionInfo := map[string]any{
"client_id": fmt.Sprintf("%d", clientID),
"username": session.Username,
"auth_method": session.Method,
"session_id": sess.ID,
"stream_path": h.config.StreamPath,
"status_path": h.config.StatusPath,
"buffer_size": h.config.BufferSize,
@ -573,20 +589,15 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
return
}
case <-tickerChan:
// Validate session is still active
if h.authenticator != nil && session != nil && !h.authenticator.ValidateSession(session.ID) {
fmt.Fprintf(w, "event: disconnect\ndata: {\"reason\":\"session_expired\"}\n\n")
w.Flush()
return
}
// Update session activity
h.sessionManager.UpdateActivity(sess.ID)
// Heartbeat is sent from broker, additional client-specific heartbeat is sent here
// This provides per-client heartbeat validation with session check
case <-tickerChan:
// Client-specific heartbeat
sessionHB := map[string]any{
"type": "session_heartbeat",
"client_id": fmt.Sprintf("%d", clientID),
"session_valid": true,
"type": "heartbeat",
"client_id": fmt.Sprintf("%d", clientID),
"session_id": sess.ID,
}
hbData, _ := json.Marshal(sessionHB)
fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", hbData)
@ -607,49 +618,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, session *auth.Session)
ctx.SetBodyStreamWriter(streamFunc)
}
func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry core.LogEntry) error {
formatted, err := h.formatter.Format(entry)
if err != nil {
return err
}
// Remove trailing newline if present (SSE adds its own)
formatted = bytes.TrimSuffix(formatted, []byte{'\n'})
// Multi-line content handler
lines := bytes.Split(formatted, []byte{'\n'})
for _, line := range lines {
// SSE needs "data: " prefix for each line based on W3C spec
fmt.Fprintf(w, "data: %s\n", line)
}
fmt.Fprintf(w, "\n") // Empty line to terminate event
return nil
}
func (h *HTTPSink) createHeartbeatEntry() core.LogEntry {
message := "heartbeat"
// Build fields for heartbeat metadata
fields := make(map[string]any)
fields["type"] = "heartbeat"
if h.config.Heartbeat.Enabled {
fields["active_clients"] = h.activeClients.Load()
fields["uptime_seconds"] = int(time.Since(h.startTime).Seconds())
}
fieldsJSON, _ := json.Marshal(fields)
return core.LogEntry{
Time: time.Now(),
Source: "logwisp-http",
Level: "INFO",
Message: message,
Fields: fieldsJSON,
}
}
// handleStatus provides a JSON status report of the sink.
func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
ctx.SetContentType("application/json")
@ -662,17 +631,6 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
}
}
var authStats any
if h.authenticator != nil {
authStats = h.authenticator.GetStats()
authStats.(map[string]any)["failures"] = h.authFailures.Load()
authStats.(map[string]any)["successes"] = h.authSuccesses.Load()
} else {
authStats = map[string]any{
"enabled": false,
}
}
var tlsStats any
if h.tlsManager != nil {
tlsStats = h.tlsManager.GetStats()
@ -682,6 +640,11 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
}
}
var sessionStats any
if h.sessionManager != nil {
sessionStats = h.sessionManager.GetStats()
}
status := map[string]any{
"service": "LogWisp",
"version": version.Short(),
@ -703,13 +666,11 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
"format": h.config.Heartbeat.Format,
},
"tls": tlsStats,
"auth": authStats,
"sessions": sessionStats,
"net_limit": netLimitStats,
},
"statistics": map[string]any{
"total_processed": h.totalProcessed.Load(),
"auth_failures": h.authFailures.Load(),
"auth_successes": h.authSuccesses.Load(),
},
}
@ -717,22 +678,71 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
ctx.SetBody(data)
}
// Returns the current number of active clients
func (h *HTTPSink) GetActiveConnections() int64 {
return h.activeClients.Load()
// handleSessionExpiry is the callback for cleaning up expired sessions.
func (h *HTTPSink) handleSessionExpiry(sessionID, remoteAddr string) {
h.sessionsMu.RLock()
defer h.sessionsMu.RUnlock()
// Find client by session ID
for clientID, sessID := range h.clientSessions {
if sessID == sessionID {
h.logger.Info("msg", "Closing expired session client",
"component", "http_sink",
"session_id", sessionID,
"client_id", clientID,
"remote_addr", remoteAddr)
// Signal broker to unregister
select {
case h.unregister <- clientID:
case <-h.done:
}
return
}
}
}
// Returns the configured transport endpoint path
func (h *HTTPSink) GetStreamPath() string {
return h.config.StreamPath
// createHeartbeatEntry generates a new heartbeat log entry.
func (h *HTTPSink) createHeartbeatEntry() core.LogEntry {
message := "heartbeat"
// Build fields for heartbeat metadata
fields := make(map[string]any)
fields["type"] = "heartbeat"
if h.config.Heartbeat.Enabled {
fields["active_clients"] = h.activeClients.Load()
fields["uptime_seconds"] = int(time.Since(h.startTime).Seconds())
}
fieldsJSON, _ := json.Marshal(fields)
return core.LogEntry{
Time: time.Now(),
Source: "logwisp-http",
Level: "INFO",
Message: message,
Fields: fieldsJSON,
}
}
// Returns the configured status endpoint path
func (h *HTTPSink) GetStatusPath() string {
return h.config.StatusPath
}
// formatEntryForSSE formats a log entry into the SSE 'data:' format.
func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry core.LogEntry) error {
formatted, err := h.formatter.Format(entry)
if err != nil {
return err
}
// Returns the configured host
func (h *HTTPSink) GetHost() string {
return h.config.Host
// Remove trailing newline if present (SSE adds its own)
formatted = bytes.TrimSuffix(formatted, []byte{'\n'})
// Multi-line content handler
lines := bytes.Split(formatted, []byte{'\n'})
for _, line := range lines {
// SSE needs "data: " prefix for each line based on W3C spec
fmt.Fprintf(w, "data: %s\n", line)
}
fmt.Fprintf(w, "\n") // Empty line to terminate event
return nil
}

View File

@ -5,39 +5,39 @@ import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/auth"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/session"
ltls "logwisp/src/internal/tls"
"logwisp/src/internal/version"
"github.com/lixenwraith/log"
"github.com/valyala/fasthttp"
)
// TODO: implement heartbeat for HTTP Client Sink, similar to HTTP Sink
// Forwards log entries to a remote HTTP endpoint
// TODO: add heartbeat
// HTTPClientSink forwards log entries to a remote HTTP endpoint.
type HTTPClientSink struct {
input chan core.LogEntry
config *config.HTTPClientSinkOptions
client *fasthttp.Client
batch []core.LogEntry
batchMu sync.Mutex
done chan struct{}
wg sync.WaitGroup
startTime time.Time
logger *log.Logger
formatter format.Formatter
authenticator *auth.Authenticator
input chan core.LogEntry
config *config.HTTPClientSinkOptions
client *fasthttp.Client
batch []core.LogEntry
batchMu sync.Mutex
done chan struct{}
wg sync.WaitGroup
startTime time.Time
logger *log.Logger
formatter format.Formatter
sessionID string
sessionManager *session.Manager
tlsManager *ltls.ClientManager
// Statistics
totalProcessed atomic.Uint64
@ -48,20 +48,21 @@ type HTTPClientSink struct {
activeConnections atomic.Int64
}
// Creates a new HTTP client sink
// NewHTTPClientSink creates a new HTTP client sink.
func NewHTTPClientSink(opts *config.HTTPClientSinkOptions, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) {
if opts == nil {
return nil, fmt.Errorf("HTTP client sink options cannot be nil")
}
h := &HTTPClientSink{
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
batch: make([]core.LogEntry, 0, opts.BatchSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
batch: make([]core.LogEntry, 0, opts.BatchSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
sessionManager: session.NewManager(30 * time.Minute),
}
h.lastProcessed.Store(time.Time{})
h.lastBatchSent.Store(time.Time{})
@ -75,54 +76,48 @@ func NewHTTPClientSink(opts *config.HTTPClientSinkOptions, logger *log.Logger, f
DisableHeaderNamesNormalizing: true,
}
// Configure TLS if using HTTPS
// Configure TLS for HTTPS
if strings.HasPrefix(opts.URL, "https://") {
tlsConfig := &tls.Config{
InsecureSkipVerify: opts.InsecureSkipVerify,
}
// Use TLS config if provided
if opts.TLS != nil {
// Load custom CA for server verification
if opts.TLS.CAFile != "" {
caCert, err := os.ReadFile(opts.TLS.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file '%s': %w", opts.TLS.CAFile, err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate from '%s'", opts.TLS.CAFile)
}
tlsConfig.RootCAs = caCertPool
logger.Debug("msg", "Custom CA loaded for server verification",
"component", "http_client_sink",
"ca_file", opts.TLS.CAFile)
if opts.TLS != nil && opts.TLS.Enabled {
// Use the new ClientManager with the clear client-specific config
tlsManager, err := ltls.NewClientManager(opts.TLS, logger)
if err != nil {
return nil, fmt.Errorf("failed to create TLS client manager: %w", err)
}
h.tlsManager = tlsManager
// Get the generated config
h.client.TLSConfig = tlsManager.GetConfig()
// Load client certificate for mTLS if provided
if opts.TLS.CertFile != "" && opts.TLS.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(opts.TLS.CertFile, opts.TLS.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
logger.Info("msg", "Client certificate loaded for mTLS",
"component", "http_client_sink",
"cert_file", opts.TLS.CertFile)
logger.Info("msg", "Client TLS configured",
"component", "http_client_sink",
"has_client_cert", opts.TLS.ClientCertFile != "", // Clearer check
"has_server_ca", opts.TLS.ServerCAFile != "", // Clearer check
"min_version", opts.TLS.MinVersion)
} else if opts.InsecureSkipVerify { // Use the new clear field
// TODO: document this behavior
h.client.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
h.client.TLSConfig = tlsConfig
}
return h, nil
}
// Input returns the channel for sending log entries.
func (h *HTTPClientSink) Input() chan<- core.LogEntry {
return h.input
}
// Start begins the processing and batching loops.
func (h *HTTPClientSink) Start(ctx context.Context) error {
// Create session for HTTP client sink lifetime
sess := h.sessionManager.CreateSession(h.config.URL, "http_client_sink", map[string]any{
"batch_size": h.config.BatchSize,
"timeout": h.config.Timeout,
})
h.sessionID = sess.ID
h.wg.Add(2)
go h.processLoop(ctx)
go h.batchTimer(ctx)
@ -131,10 +126,12 @@ func (h *HTTPClientSink) Start(ctx context.Context) error {
"component", "http_client_sink",
"url", h.config.URL,
"batch_size", h.config.BatchSize,
"batch_delay_ms", h.config.BatchDelayMS)
"batch_delay_ms", h.config.BatchDelayMS,
"session_id", h.sessionID)
return nil
}
// Stop gracefully shuts down the sink, sending any remaining batched entries.
func (h *HTTPClientSink) Stop() {
h.logger.Info("msg", "Stopping HTTP client sink")
close(h.done)
@ -151,12 +148,21 @@ func (h *HTTPClientSink) Stop() {
h.batchMu.Unlock()
}
// Remove session and stop manager
if h.sessionID != "" {
h.sessionManager.RemoveSession(h.sessionID)
}
if h.sessionManager != nil {
h.sessionManager.Stop()
}
h.logger.Info("msg", "HTTP client sink stopped",
"total_processed", h.totalProcessed.Load(),
"total_batches", h.totalBatches.Load(),
"failed_batches", h.failedBatches.Load())
}
// GetStats returns the sink's statistics.
func (h *HTTPClientSink) GetStats() SinkStats {
lastProc, _ := h.lastProcessed.Load().(time.Time)
lastBatch, _ := h.lastBatchSent.Load().(time.Time)
@ -165,6 +171,23 @@ func (h *HTTPClientSink) GetStats() SinkStats {
pendingEntries := len(h.batch)
h.batchMu.Unlock()
// Get session information
var sessionInfo map[string]any
if h.sessionID != "" {
if sess, exists := h.sessionManager.GetSession(h.sessionID); exists {
sessionInfo = map[string]any{
"session_id": sess.ID,
"created_at": sess.CreatedAt,
"last_activity": sess.LastActivity,
}
}
}
var tlsStats map[string]any
if h.tlsManager != nil {
tlsStats = h.tlsManager.GetStats()
}
return SinkStats{
Type: "http_client",
TotalProcessed: h.totalProcessed.Load(),
@ -178,10 +201,13 @@ func (h *HTTPClientSink) GetStats() SinkStats {
"total_batches": h.totalBatches.Load(),
"failed_batches": h.failedBatches.Load(),
"last_batch_sent": lastBatch,
"session": sessionInfo,
"tls": tlsStats,
},
}
}
// processLoop collects incoming log entries into a batch.
func (h *HTTPClientSink) processLoop(ctx context.Context) {
defer h.wg.Done()
@ -219,6 +245,7 @@ func (h *HTTPClientSink) processLoop(ctx context.Context) {
}
}
// batchTimer periodically triggers sending of the current batch.
func (h *HTTPClientSink) batchTimer(ctx context.Context) {
defer h.wg.Done()
@ -248,6 +275,7 @@ func (h *HTTPClientSink) batchTimer(ctx context.Context) {
}
}
// sendBatch sends a batch of log entries to the remote endpoint with retry logic.
func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
h.activeConnections.Add(1)
defer h.activeConnections.Add(-1)
@ -293,7 +321,6 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
var lastErr error
retryDelay := time.Duration(h.config.RetryDelayMS) * time.Millisecond
// TODO: verify retry loop placement is correct or should it be after acquiring resources (req :=....)
for attempt := int64(0); attempt <= h.config.MaxRetries; attempt++ {
if attempt > 0 {
// Wait before retry
@ -323,24 +350,6 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
req.Header.Set("User-Agent", fmt.Sprintf("LogWisp/%s", version.Short()))
// Add authentication based on auth type
switch h.config.Auth.Type {
case "basic":
creds := h.config.Auth.Username + ":" + h.config.Auth.Password
encodedCreds := base64.StdEncoding.EncodeToString([]byte(creds))
req.Header.Set("Authorization", "Basic "+encodedCreds)
case "token":
req.Header.Set("Authorization", "Token "+h.config.Auth.Token)
case "mtls":
// mTLS auth is handled at TLS layer via client certificates
// No Authorization header needed
case "none":
// No authentication
}
// Send request
err := h.client.DoTimeout(req, resp, time.Duration(h.config.Timeout)*time.Second)
@ -370,6 +379,12 @@ func (h *HTTPClientSink) sendBatch(batch []core.LogEntry) {
// Check response status
if statusCode >= 200 && statusCode < 300 {
// Success
// Update session activity on successful batch send
if h.sessionID != "" {
h.sessionManager.UpdateActivity(h.sessionID)
}
h.logger.Debug("msg", "Batch sent successfully",
"component", "http_client_sink",
"batch_size", len(batch),

View File

@ -8,22 +8,22 @@ import (
"logwisp/src/internal/core"
)
// Represents an output data stream
// Sink represents an output data stream.
type Sink interface {
// Returns the channel for sending log entries to this sink
// Input returns the channel for sending log entries to this sink.
Input() chan<- core.LogEntry
// Begins processing log entries
// Start begins processing log entries.
Start(ctx context.Context) error
// Gracefully shuts down the sink
// Stop gracefully shuts down the sink.
Stop()
// Returns sink statistics
// GetStats returns sink statistics.
GetStats() SinkStats
}
// Contains statistics about a sink
// SinkStats contains statistics about a sink.
type SinkStats struct {
Type string
TotalProcessed uint64

View File

@ -11,31 +11,32 @@ import (
"sync/atomic"
"time"
"logwisp/src/internal/auth"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/limit"
"logwisp/src/internal/session"
"github.com/lixenwraith/log"
"github.com/lixenwraith/log/compat"
"github.com/panjf2000/gnet/v2"
)
// Streams log entries via TCP
// TCPSink streams log entries to connected TCP clients.
type TCPSink struct {
input chan core.LogEntry
config *config.TCPSinkOptions
server *tcpServer
done chan struct{}
activeConns atomic.Int64
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
netLimiter *limit.NetLimiter
logger *log.Logger
formatter format.Formatter
input chan core.LogEntry
config *config.TCPSinkOptions
server *tcpServer
done chan struct{}
activeConns atomic.Int64
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
netLimiter *limit.NetLimiter
logger *log.Logger
formatter format.Formatter
sessionManager *session.Manager
// Statistics
totalProcessed atomic.Uint64
@ -47,7 +48,7 @@ type TCPSink struct {
errorMu sync.Mutex
}
// Holds TCP sink configuration
// TCPConfig holds configuration for the TCPSink.
type TCPConfig struct {
Host string
Port int64
@ -56,19 +57,21 @@ type TCPConfig struct {
NetLimit *config.NetLimitConfig
}
// Creates a new TCP streaming sink
// NewTCPSink creates a new TCP streaming sink.
func NewTCPSink(opts *config.TCPSinkOptions, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) {
if opts == nil {
return nil, fmt.Errorf("TCP sink options cannot be nil")
}
t := &TCPSink{
config: opts, // Direct reference to config
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
config: opts,
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
consecutiveWriteErrors: make(map[gnet.Conn]int),
sessionManager: session.NewManager(30 * time.Minute),
}
t.lastProcessed.Store(time.Time{})
@ -82,16 +85,23 @@ func NewTCPSink(opts *config.TCPSinkOptions, logger *log.Logger, formatter forma
return t, nil
}
// Input returns the channel for sending log entries.
func (t *TCPSink) Input() chan<- core.LogEntry {
return t.input
}
// Start initializes the TCP server and begins the broadcast loop.
func (t *TCPSink) Start(ctx context.Context) error {
t.server = &tcpServer{
sink: t,
clients: make(map[gnet.Conn]*tcpClient),
}
// Register expiry callback
t.sessionManager.RegisterExpiryCallback("tcp_sink", func(sessionID, remoteAddr string) {
t.handleSessionExpiry(sessionID, remoteAddr)
})
// Start log broadcast loop
t.wg.Add(1)
go func() {
@ -155,8 +165,13 @@ func (t *TCPSink) Start(ctx context.Context) error {
}
}
// Stop gracefully shuts down the TCP server.
func (t *TCPSink) Stop() {
t.logger.Info("msg", "Stopping TCP sink")
// Unregister callback
t.sessionManager.UnregisterExpiryCallback("tcp_sink")
// Signal broadcast loop to stop
close(t.done)
@ -174,9 +189,15 @@ func (t *TCPSink) Stop() {
// Wait for broadcast loop to finish
t.wg.Wait()
// Stop session manager
if t.sessionManager != nil {
t.sessionManager.Stop()
}
t.logger.Info("msg", "TCP sink stopped")
}
// GetStats returns the sink's statistics.
func (t *TCPSink) GetStats() SinkStats {
lastProc, _ := t.lastProcessed.Load().(time.Time)
@ -185,6 +206,11 @@ func (t *TCPSink) GetStats() SinkStats {
netLimitStats = t.netLimiter.GetStats()
}
var sessionStats map[string]any
if t.sessionManager != nil {
sessionStats = t.sessionManager.GetStats()
}
return SinkStats{
Type: "tcp",
TotalProcessed: t.totalProcessed.Load(),
@ -195,11 +221,32 @@ func (t *TCPSink) GetStats() SinkStats {
"port": t.config.Port,
"buffer_size": t.config.BufferSize,
"net_limit": netLimitStats,
"auth": map[string]any{"enabled": false},
"sessions": sessionStats,
},
}
}
// GetActiveConnections returns the current number of active connections.
func (t *TCPSink) GetActiveConnections() int64 {
return t.activeConns.Load()
}
// tcpServer implements the gnet.EventHandler interface for the TCP sink.
type tcpServer struct {
gnet.BuiltinEventEngine
sink *TCPSink
clients map[gnet.Conn]*tcpClient
mu sync.RWMutex
}
// tcpClient represents a connected TCP client.
type tcpClient struct {
conn gnet.Conn
buffer bytes.Buffer
sessionID string
}
// broadcastLoop manages the central broadcasting of log entries to all clients.
func (t *TCPSink) broadcastLoop(ctx context.Context) {
var ticker *time.Ticker
var tickerChan <-chan time.Time
@ -248,101 +295,7 @@ func (t *TCPSink) broadcastLoop(ctx context.Context) {
}
}
func (t *TCPSink) broadcastData(data []byte) {
t.server.mu.RLock()
defer t.server.mu.RUnlock()
for conn, _ := range t.server.clients {
conn.AsyncWrite(data, func(c gnet.Conn, err error) error {
if err != nil {
t.writeErrors.Add(1)
t.handleWriteError(c, err)
} else {
// Reset consecutive error count on success
t.errorMu.Lock()
delete(t.consecutiveWriteErrors, c)
t.errorMu.Unlock()
}
return nil
})
}
}
// Handle write errors with threshold-based connection termination
func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
t.errorMu.Lock()
defer t.errorMu.Unlock()
// Track consecutive errors per connection
if t.consecutiveWriteErrors == nil {
t.consecutiveWriteErrors = make(map[gnet.Conn]int)
}
t.consecutiveWriteErrors[c]++
errorCount := t.consecutiveWriteErrors[c]
t.logger.Debug("msg", "AsyncWrite error",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"error", err,
"consecutive_errors", errorCount)
// Close connection after 3 consecutive write errors
if errorCount >= 3 {
t.logger.Warn("msg", "Closing connection due to repeated write errors",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"error_count", errorCount)
delete(t.consecutiveWriteErrors, c)
c.Close()
}
}
// Create heartbeat as a proper LogEntry
func (t *TCPSink) createHeartbeatEntry() core.LogEntry {
message := "heartbeat"
// Build fields for heartbeat metadata
fields := make(map[string]any)
fields["type"] = "heartbeat"
if t.config.Heartbeat.IncludeStats {
fields["active_connections"] = t.activeConns.Load()
fields["uptime_seconds"] = int64(time.Since(t.startTime).Seconds())
}
fieldsJSON, _ := json.Marshal(fields)
return core.LogEntry{
Time: time.Now(),
Source: "logwisp-tcp",
Level: "INFO",
Message: message,
Fields: fieldsJSON,
}
}
// Returns the current number of connections
func (t *TCPSink) GetActiveConnections() int64 {
return t.activeConns.Load()
}
// Represents a connected TCP client with auth state
type tcpClient struct {
conn gnet.Conn
buffer bytes.Buffer
authTimeout time.Time
session *auth.Session
}
// Handles gnet events with authentication
type tcpServer struct {
gnet.BuiltinEventEngine
sink *TCPSink
clients map[gnet.Conn]*tcpClient
mu sync.RWMutex
}
// OnBoot is called when the server starts.
func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action {
// Store engine reference for shutdown
s.sink.engineMu.Lock()
@ -355,6 +308,7 @@ func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action {
return gnet.None
}
// OnOpen is called when a new connection is established.
func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
remoteAddr := c.RemoteAddr()
s.sink.logger.Debug("msg", "TCP connection attempt", "remote_addr", remoteAddr)
@ -387,10 +341,14 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
s.sink.netLimiter.AddConnection(remoteStr)
}
// Create session for tracking
sess := s.sink.sessionManager.CreateSession(c.RemoteAddr().String(), "tcp_sink", nil)
// TCP Sink accepts all connections without authentication
client := &tcpClient{
conn: c,
buffer: bytes.Buffer{},
conn: c,
buffer: bytes.Buffer{},
sessionID: sess.ID,
}
s.mu.Lock()
@ -400,14 +358,30 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
newCount := s.sink.activeConns.Add(1)
s.sink.logger.Debug("msg", "TCP connection opened",
"remote_addr", remoteAddr,
"session_id", sess.ID,
"active_connections", newCount)
return nil, gnet.None
}
// OnClose is called when a connection is closed.
func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
remoteAddr := c.RemoteAddr().String()
// Get client to retrieve session ID
s.mu.RLock()
client, exists := s.clients[c]
s.mu.RUnlock()
if exists && client.sessionID != "" {
// Remove session
s.sink.sessionManager.RemoveSession(client.sessionID)
s.sink.logger.Debug("msg", "Session removed",
"component", "tcp_sink",
"session_id", client.sessionID,
"remote_addr", remoteAddr)
}
// Remove client state
s.mu.Lock()
delete(s.clients, c)
@ -431,8 +405,141 @@ func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
return gnet.None
}
// OnTraffic is called when data is received from a connection.
func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action {
s.mu.RLock()
client, exists := s.clients[c]
s.mu.RUnlock()
// Update session activity when client sends data
if exists && client.sessionID != "" {
s.sink.sessionManager.UpdateActivity(client.sessionID)
}
// TCP Sink doesn't expect any data from clients, discard all
c.Discard(-1)
return gnet.None
}
// handleSessionExpiry is the callback for cleaning up expired sessions.
func (t *TCPSink) handleSessionExpiry(sessionID, remoteAddr string) {
t.server.mu.RLock()
defer t.server.mu.RUnlock()
// Find connection by session ID
for conn, client := range t.server.clients {
if client.sessionID == sessionID {
t.logger.Info("msg", "Closing expired session connection",
"component", "tcp_sink",
"session_id", sessionID,
"remote_addr", remoteAddr)
// Close connection
conn.Close()
return
}
}
}
// broadcastData sends a formatted byte slice to all connected clients.
func (t *TCPSink) broadcastData(data []byte) {
t.server.mu.RLock()
defer t.server.mu.RUnlock()
// Track clients to remove after iteration
var staleClients []gnet.Conn
for conn, client := range t.server.clients {
// Update session activity before sending data
if client.sessionID != "" {
if !t.sessionManager.IsSessionActive(client.sessionID) {
// Session expired, mark for cleanup
staleClients = append(staleClients, conn)
continue
}
t.sessionManager.UpdateActivity(client.sessionID)
}
conn.AsyncWrite(data, func(c gnet.Conn, err error) error {
if err != nil {
t.writeErrors.Add(1)
t.handleWriteError(c, err)
} else {
// Reset consecutive error count on success
t.errorMu.Lock()
delete(t.consecutiveWriteErrors, c)
t.errorMu.Unlock()
}
return nil
})
}
// Clean up stale connections outside the read lock
if len(staleClients) > 0 {
go t.cleanupStaleConnections(staleClients)
}
}
// handleWriteError manages errors during async writes, closing faulty connections.
func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
t.errorMu.Lock()
defer t.errorMu.Unlock()
// Track consecutive errors per connection
if t.consecutiveWriteErrors == nil {
t.consecutiveWriteErrors = make(map[gnet.Conn]int)
}
t.consecutiveWriteErrors[c]++
errorCount := t.consecutiveWriteErrors[c]
t.logger.Debug("msg", "AsyncWrite error",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"error", err,
"consecutive_errors", errorCount)
// Close connection after 3 consecutive write errors
if errorCount >= 3 {
t.logger.Warn("msg", "Closing connection due to repeated write errors",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"error_count", errorCount)
delete(t.consecutiveWriteErrors, c)
c.Close()
}
}
// createHeartbeatEntry generates a new heartbeat log entry.
func (t *TCPSink) createHeartbeatEntry() core.LogEntry {
message := "heartbeat"
// Build fields for heartbeat metadata
fields := make(map[string]any)
fields["type"] = "heartbeat"
if t.config.Heartbeat.IncludeStats {
fields["active_connections"] = t.activeConns.Load()
fields["uptime_seconds"] = int64(time.Since(t.startTime).Seconds())
}
fieldsJSON, _ := json.Marshal(fields)
return core.LogEntry{
Time: time.Now(),
Source: "logwisp-tcp",
Level: "INFO",
Message: message,
Fields: fieldsJSON,
}
}
// cleanupStaleConnections closes connections associated with expired sessions.
func (t *TCPSink) cleanupStaleConnections(staleConns []gnet.Conn) {
for _, conn := range staleConns {
t.logger.Info("msg", "Closing stale connection",
"component", "tcp_sink",
"remote_addr", conn.RemoteAddr())
conn.Close()
}
}

View File

@ -2,28 +2,25 @@
package sink
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/auth"
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/session"
"github.com/lixenwraith/log"
)
// TODO: implement heartbeat for TCP Client Sink, similar to TCP Sink
// Forwards log entries to a remote TCP endpoint
// TODO: add heartbeat
// TCPClientSink forwards log entries to a remote TCP endpoint.
type TCPClientSink struct {
input chan core.LogEntry
config *config.TCPClientSinkOptions
@ -36,7 +33,9 @@ type TCPClientSink struct {
logger *log.Logger
formatter format.Formatter
// Reconnection state
// Connection
sessionID string
sessionManager *session.Manager
reconnecting atomic.Bool
lastConnectErr error
connectTime time.Time
@ -49,7 +48,7 @@ type TCPClientSink struct {
connectionUptime atomic.Value // time.Duration
}
// Creates a new TCP client sink
// NewTCPClientSink creates a new TCP client sink.
func NewTCPClientSink(opts *config.TCPClientSinkOptions, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) {
// Validation and defaults are handled in config package
if opts == nil {
@ -57,13 +56,14 @@ func NewTCPClientSink(opts *config.TCPClientSinkOptions, logger *log.Logger, for
}
t := &TCPClientSink{
config: opts,
address: opts.Host + ":" + strconv.Itoa(int(opts.Port)),
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
config: opts,
address: opts.Host + ":" + strconv.Itoa(int(opts.Port)),
input: make(chan core.LogEntry, opts.BufferSize),
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
formatter: formatter,
sessionManager: session.NewManager(30 * time.Minute),
}
t.lastProcessed.Store(time.Time{})
t.connectionUptime.Store(time.Duration(0))
@ -71,10 +71,12 @@ func NewTCPClientSink(opts *config.TCPClientSinkOptions, logger *log.Logger, for
return t, nil
}
// Input returns the channel for sending log entries.
func (t *TCPClientSink) Input() chan<- core.LogEntry {
return t.input
}
// Start begins the connection and processing loops.
func (t *TCPClientSink) Start(ctx context.Context) error {
// Start connection manager
t.wg.Add(1)
@ -91,6 +93,7 @@ func (t *TCPClientSink) Start(ctx context.Context) error {
return nil
}
// Stop gracefully shuts down the sink and its connection.
func (t *TCPClientSink) Stop() {
t.logger.Info("msg", "Stopping TCP client sink")
close(t.done)
@ -103,12 +106,21 @@ func (t *TCPClientSink) Stop() {
}
t.connMu.Unlock()
// Remove session and stop manager
if t.sessionID != "" {
t.sessionManager.RemoveSession(t.sessionID)
}
if t.sessionManager != nil {
t.sessionManager.Stop()
}
t.logger.Info("msg", "TCP client sink stopped",
"total_processed", t.totalProcessed.Load(),
"total_failed", t.totalFailed.Load(),
"total_reconnects", t.totalReconnects.Load())
}
// GetStats returns the sink's statistics.
func (t *TCPClientSink) GetStats() SinkStats {
lastProc, _ := t.lastProcessed.Load().(time.Time)
uptime, _ := t.connectionUptime.Load().(time.Duration)
@ -122,6 +134,19 @@ func (t *TCPClientSink) GetStats() SinkStats {
activeConns = 1
}
// Get session stats
var sessionInfo map[string]any
if t.sessionID != "" {
if sess, exists := t.sessionManager.GetSession(t.sessionID); exists {
sessionInfo = map[string]any{
"session_id": sess.ID,
"created_at": sess.CreatedAt,
"last_activity": sess.LastActivity,
"remote_addr": sess.RemoteAddr,
}
}
}
return SinkStats{
Type: "tcp_client",
TotalProcessed: t.totalProcessed.Load(),
@ -136,10 +161,12 @@ func (t *TCPClientSink) GetStats() SinkStats {
"total_reconnects": t.totalReconnects.Load(),
"connection_uptime": uptime.Seconds(),
"last_error": fmt.Sprintf("%v", t.lastConnectErr),
"session": sessionInfo,
},
}
}
// connectionManager handles the lifecycle of the TCP connection, including reconnections.
func (t *TCPClientSink) connectionManager(ctx context.Context) {
defer t.wg.Done()
@ -154,6 +181,11 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
default:
}
if t.sessionID != "" {
t.sessionManager.RemoveSession(t.sessionID)
t.sessionID = ""
}
// Attempt to connect
t.reconnecting.Store(true)
conn, err := t.connect()
@ -190,6 +222,13 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.connectTime = time.Now()
t.totalReconnects.Add(1)
// Create session for the connection
sess := t.sessionManager.CreateSession(t.address, "tcp_client_sink", map[string]any{
"local_addr": conn.LocalAddr().String(),
"sink_type": "tcp_client",
})
t.sessionID = sess.ID
t.connMu.Lock()
t.conn = conn
t.connMu.Unlock()
@ -197,7 +236,8 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.logger.Info("msg", "Connected to TCP server",
"component", "tcp_client_sink",
"address", t.address,
"local_addr", conn.LocalAddr())
"local_addr", conn.LocalAddr(),
"session_id", t.sessionID)
// Monitor connection
t.monitorConnection(conn)
@ -214,10 +254,57 @@ func (t *TCPClientSink) connectionManager(ctx context.Context) {
t.logger.Warn("msg", "Lost connection to TCP server",
"component", "tcp_client_sink",
"address", t.address,
"uptime", uptime)
"uptime", uptime,
"session_id", t.sessionID)
}
}
// processLoop reads entries from the input channel and sends them.
func (t *TCPClientSink) processLoop(ctx context.Context) {
defer t.wg.Done()
for {
select {
case entry, ok := <-t.input:
if !ok {
return
}
t.totalProcessed.Add(1)
t.lastProcessed.Store(time.Now())
// Send entry
if err := t.sendEntry(entry); err != nil {
t.totalFailed.Add(1)
t.logger.Debug("msg", "Failed to send log entry",
"component", "tcp_client_sink",
"error", err)
} else {
// Update session activity on successful send
if t.sessionID != "" {
t.sessionManager.UpdateActivity(t.sessionID)
} else {
// Close invalid connection without session
t.logger.Warn("msg", "Connection without session detected, forcing reconnection",
"component", "tcp_client_sink")
t.connMu.Lock()
if t.conn != nil {
_ = t.conn.Close()
t.conn = nil
}
t.connMu.Unlock()
}
}
case <-ctx.Done():
return
case <-t.done:
return
}
}
}
// connect attempts to establish a connection to the remote server.
func (t *TCPClientSink) connect() (net.Conn, error) {
dialer := &net.Dialer{
Timeout: time.Duration(t.config.DialTimeout) * time.Second,
@ -235,129 +322,10 @@ func (t *TCPClientSink) connect() (net.Conn, error) {
tcpConn.SetKeepAlivePeriod(time.Duration(t.config.KeepAlive) * time.Second)
}
// SCRAM authentication if credentials configured
if t.config.Auth != nil && t.config.Auth.Type == "scram" {
if err := t.performSCRAMAuth(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("SCRAM authentication failed: %w", err)
}
t.logger.Debug("msg", "SCRAM authentication completed",
"component", "tcp_client_sink",
"address", t.address)
}
return conn, nil
}
func (t *TCPClientSink) performSCRAMAuth(conn net.Conn) error {
reader := bufio.NewReader(conn)
// Create SCRAM client
scramClient := auth.NewScramClient(t.config.Auth.Username, t.config.Auth.Password)
// Wait for AUTH_REQUIRED from server
authPrompt, err := reader.ReadString('\n')
if err != nil {
return fmt.Errorf("failed to read auth prompt: %w", err)
}
if strings.TrimSpace(authPrompt) != "AUTH_REQUIRED" {
return fmt.Errorf("unexpected server greeting: %s", authPrompt)
}
// Step 1: Send ClientFirst
clientFirst, err := scramClient.StartAuthentication()
if err != nil {
return fmt.Errorf("failed to start SCRAM: %w", err)
}
msg, err := auth.FormatSCRAMRequest("SCRAM-FIRST", clientFirst)
if err != nil {
return err
}
if _, err := conn.Write([]byte(msg)); err != nil {
return fmt.Errorf("failed to send SCRAM-FIRST: %w", err)
}
// Step 2: Receive ServerFirst challenge
response, err := reader.ReadString('\n')
if err != nil {
return fmt.Errorf("failed to read SCRAM challenge: %w", err)
}
command, data, err := auth.ParseSCRAMResponse(response)
if err != nil {
return err
}
if command != "SCRAM-CHALLENGE" {
return fmt.Errorf("unexpected server response: %s", command)
}
var serverFirst auth.ServerFirst
if err := json.Unmarshal([]byte(data), &serverFirst); err != nil {
return fmt.Errorf("failed to parse server challenge: %w", err)
}
// Step 3: Process challenge and send proof
clientFinal, err := scramClient.ProcessServerFirst(&serverFirst)
if err != nil {
return fmt.Errorf("failed to process challenge: %w", err)
}
msg, err = auth.FormatSCRAMRequest("SCRAM-PROOF", clientFinal)
if err != nil {
return err
}
if _, err := conn.Write([]byte(msg)); err != nil {
return fmt.Errorf("failed to send SCRAM-PROOF: %w", err)
}
// Step 4: Receive ServerFinal
response, err = reader.ReadString('\n')
if err != nil {
return fmt.Errorf("failed to read SCRAM result: %w", err)
}
command, data, err = auth.ParseSCRAMResponse(response)
if err != nil {
return err
}
switch command {
case "SCRAM-OK":
var serverFinal auth.ServerFinal
if err := json.Unmarshal([]byte(data), &serverFinal); err != nil {
return fmt.Errorf("failed to parse server signature: %w", err)
}
// Verify server signature
if err := scramClient.VerifyServerFinal(&serverFinal); err != nil {
return fmt.Errorf("server signature verification failed: %w", err)
}
t.logger.Info("msg", "SCRAM authentication successful",
"component", "tcp_client_sink",
"address", t.address,
"username", t.config.Auth.Username,
"session_id", serverFinal.SessionID)
return nil
case "SCRAM-FAIL":
reason := data
if reason == "" {
reason = "unknown"
}
return fmt.Errorf("authentication failed: %s", reason)
default:
return fmt.Errorf("unexpected response: %s", command)
}
}
// monitorConnection checks the health of the connection.
func (t *TCPClientSink) monitorConnection(conn net.Conn) {
// Simple connection monitoring by periodic zero-byte reads
ticker := time.NewTicker(5 * time.Second)
@ -390,35 +358,7 @@ func (t *TCPClientSink) monitorConnection(conn net.Conn) {
}
}
func (t *TCPClientSink) processLoop(ctx context.Context) {
defer t.wg.Done()
for {
select {
case entry, ok := <-t.input:
if !ok {
return
}
t.totalProcessed.Add(1)
t.lastProcessed.Store(time.Now())
// Send entry
if err := t.sendEntry(entry); err != nil {
t.totalFailed.Add(1)
t.logger.Debug("msg", "Failed to send log entry",
"component", "tcp_client_sink",
"error", err)
}
case <-ctx.Done():
return
case <-t.done:
return
}
}
}
// sendEntry formats and sends a single log entry over the connection.
func (t *TCPClientSink) sendEntry(entry core.LogEntry) error {
// Get current connection
t.connMu.RLock()