From 15f484f98b3d50bd2c5c6e3b28a338291d7951ed04414c7e39bfe4d789b5cd9b Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Sun, 4 Jan 2026 17:21:08 -0500 Subject: [PATCH] v0.12.0 tcp and http server sinks added antested, no tls or access control --- .gitignore | 1 + go.mod | 13 +- go.sum | 40 ++- src/cmd/logwisp/bootstrap.go | 2 + src/internal/config/config.go | 23 +- src/internal/config/loader.go | 3 - src/internal/session/session.go | 13 +- src/internal/sink/http/const.go | 8 + src/internal/sink/http/http.go | 539 ++++++++++++++++++++++++++++++++ src/internal/sink/tcp/tcp.go | 434 +++++++++++++++++++++++++ 10 files changed, 1061 insertions(+), 15 deletions(-) create mode 100644 src/internal/sink/http/const.go create mode 100644 src/internal/sink/http/http.go create mode 100644 src/internal/sink/tcp/tcp.go diff --git a/.gitignore b/.gitignore index 4facf1e..24efab4 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ build *.log *.toml build.sh +catalog.txt diff --git a/go.mod b/go.mod index d5d1d74..c15346d 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,22 @@ go 1.25.4 require ( github.com/lixenwraith/config v0.1.1-0.20251114180219-f7875023a51b github.com/lixenwraith/log v0.1.1-0.20251115213227-55d2c92d483f + github.com/panjf2000/gnet/v2 v2.9.7 + github.com/valyala/fasthttp v1.68.0 ) require ( - github.com/BurntSushi/toml v1.5.0 // indirect + github.com/BurntSushi/toml v1.6.0 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/panjf2000/ants/v2 v2.11.4 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.1 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.39.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c1efec8..4d51bdf 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,54 @@ -github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= -github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= +github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/lixenwraith/config v0.1.1-0.20251114180219-f7875023a51b h1:TzTV0ArJ+nzVGPN8aiEJ2MknUqJdmHRP/0/RSfov2Qw= github.com/lixenwraith/config v0.1.1-0.20251114180219-f7875023a51b/go.mod h1:roNPTSCT5HSV9dru/zi/Catwc3FZVCFf7vob2pSlNW0= github.com/lixenwraith/log v0.1.1-0.20251115213227-55d2c92d483f h1:X2LX5FQEuWYGBS3qp5z7XxBB1sWAlqumf/oW7n/f9c0= github.com/lixenwraith/log v0.1.1-0.20251115213227-55d2c92d483f/go.mod h1:XcRPRuijAs+43Djk8VmioUJhcK8irRzUjCZaZqkd3gg= +github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= +github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= +github.com/panjf2000/ants/v2 v2.11.4 h1:UJQbtN1jIcI5CYNocTj0fuAUYvsLjPoYi0YuhqV/Y48= +github.com/panjf2000/ants/v2 v2.11.4/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= +github.com/panjf2000/gnet/v2 v2.9.7 h1:6zW7Jl3oAfXwSuh1PxHLndoL2MQRWx0AJR6aaQjxUgA= +github.com/panjf2000/gnet/v2 v2.9.7/go.mod h1:WQTxDWYuQ/hz3eccH0FN32IVuvZ19HewEWx0l62fx7E= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.68.0 h1:v12Nx16iepr8r9ySOwqI+5RBJ/DqTxhOy1HrHoDFnok= +github.com/valyala/fasthttp v1.68.0/go.mod h1:5EXiRfYQAoiO/khu4oU9VISC/eVY6JqmSpPJoHCKsz4= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/cmd/logwisp/bootstrap.go b/src/cmd/logwisp/bootstrap.go index 400fd28..dcfaefa 100644 --- a/src/cmd/logwisp/bootstrap.go +++ b/src/cmd/logwisp/bootstrap.go @@ -11,7 +11,9 @@ import ( _ "logwisp/src/internal/sink/console" _ "logwisp/src/internal/sink/file" + _ "logwisp/src/internal/sink/http" _ "logwisp/src/internal/sink/null" + _ "logwisp/src/internal/sink/tcp" "logwisp/src/internal/config" "logwisp/src/internal/service" diff --git a/src/internal/config/config.go b/src/internal/config/config.go index 3e311c4..b71f209 100644 --- a/src/internal/config/config.go +++ b/src/internal/config/config.go @@ -12,9 +12,6 @@ type Config struct { StatusReporter bool `toml:"status_reporter"` ConfigAutoReload bool `toml:"auto_reload"` - // Internal flag indicating demonized child process (DO NOT SET IN CONFIG FILE) - BackgroundDaemon bool - // Configuration file path ConfigFile string `toml:"config_file"` @@ -247,4 +244,24 @@ type FileSinkOptions struct { RetentionHours float64 `toml:"retention_hours"` BufferSize int64 `toml:"buffer_size"` FlushIntervalMs int64 `toml:"flush_interval_ms"` +} + +// TCPSinkOptions defines settings for a TCP server sink +type TCPSinkOptions struct { + Host string `toml:"host"` + Port int64 `toml:"port"` + BufferSize int64 `toml:"buffer_size"` + WriteTimeout int64 `toml:"write_timeout_ms"` + KeepAlive bool `toml:"keep_alive"` + KeepAlivePeriod int64 `toml:"keep_alive_period_ms"` +} + +// HTTPSinkOptions defines settings for an HTTP SSE server sink +type HTTPSinkOptions struct { + Host string `toml:"host"` + Port int64 `toml:"port"` + StreamPath string `toml:"stream_path"` + StatusPath string `toml:"status_path"` + BufferSize int64 `toml:"buffer_size"` + WriteTimeout int64 `toml:"write_timeout_ms"` } \ No newline at end of file diff --git a/src/internal/config/loader.go b/src/internal/config/loader.go index 4f92d44..851cbbc 100644 --- a/src/internal/config/loader.go +++ b/src/internal/config/loader.go @@ -95,9 +95,6 @@ func defaults() *Config { StatusReporter: true, ConfigAutoReload: false, - // Child process indicator - BackgroundDaemon: false, - // Existing defaults Logging: &LogConfig{ Output: "stdout", diff --git a/src/internal/session/session.go b/src/internal/session/session.go index f4ee13f..ed09f5f 100644 --- a/src/internal/session/session.go +++ b/src/internal/session/session.go @@ -252,12 +252,10 @@ func (m *Manager) startCleanup() { // cleanupIdleSessions removes sessions that have exceeded the maximum idle time. func (m *Manager) cleanupIdleSessions() { - m.mu.Lock() - defer m.mu.Unlock() - now := time.Now() expiredSessions := make([]*Session, 0) + m.mu.Lock() for id, session := range m.sessions { idleTime := now.Sub(session.LastActivity) @@ -268,13 +266,16 @@ func (m *Manager) cleanupIdleSessions() { } m.mu.Unlock() - // Call callbacks outside of lock if len(expiredSessions) > 0 { m.callbacksMu.RLock() - defer m.callbacksMu.RUnlock() + callbacks := make(map[string]func(sessionID, remoteAddr string)) + for k, v := range m.expiryCallbacks { + callbacks[k] = v + } + m.callbacksMu.RUnlock() for _, session := range expiredSessions { - if callback, exists := m.expiryCallbacks[session.Source]; exists { + if callback, exists := callbacks[session.Source]; exists { // Call callback to notify owner go callback(session.ID, session.RemoteAddr) } diff --git a/src/internal/sink/http/const.go b/src/internal/sink/http/const.go new file mode 100644 index 0000000..1b1fee9 --- /dev/null +++ b/src/internal/sink/http/const.go @@ -0,0 +1,8 @@ +package http + +import "time" + +const ( + HttpServerStartTimeout = 100 * time.Millisecond + HttpServerShutdownTimeout = 2 * time.Second +) \ No newline at end of file diff --git a/src/internal/sink/http/http.go b/src/internal/sink/http/http.go new file mode 100644 index 0000000..7ba1495 --- /dev/null +++ b/src/internal/sink/http/http.go @@ -0,0 +1,539 @@ +package http + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/core" + "logwisp/src/internal/plugin" + "logwisp/src/internal/session" + "logwisp/src/internal/sink" + "logwisp/src/internal/version" + + lconfig "github.com/lixenwraith/config" + "github.com/lixenwraith/log" + "github.com/lixenwraith/log/compat" + "github.com/valyala/fasthttp" +) + +func init() { + if err := plugin.RegisterSink("http", NewHTTPSinkPlugin); err != nil { + panic(fmt.Sprintf("failed to register http sink: %v", err)) + } +} + +// HTTPSink streams log entries via Server-Sent Events (SSE) +type HTTPSink struct { + // Plugin identity and session management + id string + proxy *session.Proxy + + // Configuration + config *config.HTTPSinkOptions + + // Network + server *fasthttp.Server + + // Application + input chan core.TransportEvent + logger *log.Logger + + // Runtime + done chan struct{} + wg sync.WaitGroup + startTime time.Time + + // Broker + clients map[uint64]chan []byte + clientsMu sync.RWMutex + unregister chan uint64 + nextClientID atomic.Uint64 + + // Client session tracking + clientSessions map[uint64]string // clientID -> sessionID + sessionsMu sync.RWMutex + + // Statistics + activeClients atomic.Int64 + totalProcessed atomic.Uint64 + lastProcessed atomic.Value // time.Time +} + +// NewHTTPSinkPlugin creates an HTTP sink through plugin factory +func NewHTTPSinkPlugin( + id string, + configMap map[string]any, + logger *log.Logger, + proxy *session.Proxy, +) (sink.Sink, error) { + opts := &config.HTTPSinkOptions{ + Host: "0.0.0.0", + Port: 0, + StreamPath: "/stream", + StatusPath: "/status", + BufferSize: 1000, + WriteTimeout: 0, // SSE indefinite streaming + } + + if err := lconfig.ScanMap(configMap, opts); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + if opts.Port <= 0 || opts.Port > 65535 { + return nil, fmt.Errorf("port must be between 1 and 65535") + } + if opts.BufferSize <= 0 { + opts.BufferSize = 1000 + } + if opts.StreamPath == "" { + opts.StreamPath = "/stream" + } + if opts.StatusPath == "" { + opts.StatusPath = "/status" + } + + h := &HTTPSink{ + id: id, + proxy: proxy, + config: opts, + input: make(chan core.TransportEvent, opts.BufferSize), + done: make(chan struct{}), + logger: logger, + clients: make(map[uint64]chan []byte), + unregister: make(chan uint64), + clientSessions: make(map[uint64]string), + } + h.lastProcessed.Store(time.Time{}) + + logger.Info("msg", "HTTP sink initialized", + "component", "http_sink", + "instance_id", id, + "host", opts.Host, + "port", opts.Port, + "stream_path", opts.StreamPath, + "status_path", opts.StatusPath) + + return h, nil +} + +// Capabilities returns supported capabilities +func (h *HTTPSink) Capabilities() []core.Capability { + return []core.Capability{ + core.CapSessionAware, + core.CapMultiSession, + } +} + +// Input returns the channel for sending transport events +func (h *HTTPSink) Input() chan<- core.TransportEvent { + return h.input +} + +// Start initializes the HTTP server and begins the broker loop +func (h *HTTPSink) Start(ctx context.Context) error { + h.startTime = time.Now() + + // Start central broker goroutine + h.wg.Add(1) + go h.brokerLoop(ctx) + + fasthttpLogger := compat.NewFastHTTPAdapter(h.logger) + + h.server = &fasthttp.Server{ + Name: fmt.Sprintf("LogWisp/%s", version.Short()), + Handler: h.requestHandler, + DisableKeepalive: false, + StreamRequestBody: true, + Logger: fasthttpLogger, + WriteTimeout: time.Duration(h.config.WriteTimeout) * time.Millisecond, + } + + addr := fmt.Sprintf("%s:%d", h.config.Host, h.config.Port) + + errChan := make(chan error, 1) + go func() { + h.logger.Info("msg", "HTTP server starting", + "component", "http_sink", + "instance_id", h.id, + "address", addr) + + err := h.server.ListenAndServe(addr) + if err != nil { + errChan <- err + } + }() + + // Monitor context for shutdown + go func() { + <-ctx.Done() + if h.server != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), HttpServerShutdownTimeout) + defer cancel() + h.server.ShutdownWithContext(shutdownCtx) + } + }() + + // Check if server started + select { + case err := <-errChan: + return err + case <-time.After(HttpServerStartTimeout): + h.logger.Info("msg", "HTTP server started", + "component", "http_sink", + "instance_id", h.id, + "host", h.config.Host, + "port", h.config.Port) + return nil + } +} + +// Stop gracefully shuts down the HTTP server and all client connections +func (h *HTTPSink) Stop() { + h.logger.Info("msg", "Stopping HTTP sink", + "component", "http_sink", + "instance_id", h.id) + + close(h.done) + + if h.server != nil { + ctx, cancel := context.WithTimeout(context.Background(), HttpServerShutdownTimeout) + defer cancel() + h.server.ShutdownWithContext(ctx) + } + + h.wg.Wait() + + close(h.unregister) + + h.clientsMu.Lock() + for _, ch := range h.clients { + close(ch) + } + h.clients = make(map[uint64]chan []byte) + h.clientsMu.Unlock() + + h.logger.Info("msg", "HTTP sink stopped", + "component", "http_sink", + "instance_id", h.id, + "total_processed", h.totalProcessed.Load()) +} + +// GetStats returns sink statistics +func (h *HTTPSink) GetStats() sink.SinkStats { + lastProc, _ := h.lastProcessed.Load().(time.Time) + + return sink.SinkStats{ + ID: h.id, + Type: "http", + TotalProcessed: h.totalProcessed.Load(), + ActiveConnections: h.activeClients.Load(), + StartTime: h.startTime, + LastProcessed: lastProc, + Details: map[string]any{ + "host": h.config.Host, + "port": h.config.Port, + "buffer_size": h.config.BufferSize, + "endpoints": map[string]string{ + "stream": h.config.StreamPath, + "status": h.config.StatusPath, + }, + }, + } +} + +// brokerLoop manages client connections and broadcasts transport events +func (h *HTTPSink) brokerLoop(ctx context.Context) { + defer h.wg.Done() + + for { + select { + case <-ctx.Done(): + h.logger.Debug("msg", "Broker loop stopping due to context cancellation", + "component", "http_sink") + return + + case <-h.done: + h.logger.Debug("msg", "Broker loop stopping due to shutdown signal", + "component", "http_sink") + return + + case clientID := <-h.unregister: + h.clientsMu.Lock() + if clientChan, exists := h.clients[clientID]; exists { + delete(h.clients, clientID) + close(clientChan) + h.logger.Debug("msg", "Unregistered client", + "component", "http_sink", + "client_id", clientID) + } + h.clientsMu.Unlock() + + h.sessionsMu.Lock() + delete(h.clientSessions, clientID) + h.sessionsMu.Unlock() + + case event, ok := <-h.input: + if !ok { + h.logger.Debug("msg", "Input channel closed, broker stopping", + "component", "http_sink") + return + } + + h.totalProcessed.Add(1) + h.lastProcessed.Store(time.Now()) + + h.clientsMu.RLock() + clientCount := len(h.clients) + if clientCount > 0 { + var staleClients []uint64 + + for id, ch := range h.clients { + h.sessionsMu.RLock() + sessionID, hasSession := h.clientSessions[id] + h.sessionsMu.RUnlock() + + if !hasSession { + staleClients = append(staleClients, id) + continue + } + + // Check session still exists via proxy + if _, exists := h.proxy.GetSession(sessionID); !exists { + staleClients = append(staleClients, id) + continue + } + + select { + case ch <- event.Payload: + h.proxy.UpdateActivity(sessionID) + default: + h.logger.Debug("msg", "Dropped event for slow client", + "component", "http_sink", + "client_id", id) + } + } + + if len(staleClients) > 0 { + go func() { + for _, clientID := range staleClients { + select { + case h.unregister <- clientID: + case <-h.done: + return + } + } + }() + } + } + h.clientsMu.RUnlock() + } + } +} + +// requestHandler is the main entry point for all incoming HTTP requests +func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) { + // IPv4-only enforcement - silent drop IPv6 + remoteAddr := ctx.RemoteAddr() + if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { + if tcpAddr.IP.To4() == nil { + ctx.SetConnectionClose() + return + } + } + + path := string(ctx.Path()) + + switch path { + case h.config.StatusPath: + h.handleStatus(ctx) + case h.config.StreamPath: + h.handleStream(ctx) + default: + ctx.SetStatusCode(fasthttp.StatusNotFound) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]any{ + "error": "Not Found", + }) + } +} + +// handleStream manages a client's Server-Sent Events (SSE) stream +func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { + remoteAddrStr := ctx.RemoteAddr().String() + + // Create session via proxy + sess := h.proxy.CreateSession(remoteAddrStr, map[string]any{ + "type": "http_client", + }) + + // Set SSE headers + ctx.Response.Header.Set("Content-Type", "text/event-stream") + ctx.Response.Header.Set("Cache-Control", "no-cache") + ctx.Response.Header.Set("Connection", "keep-alive") + ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") + ctx.Response.Header.Set("X-Accel-Buffering", "no") + + // Register client with broker + clientID := h.nextClientID.Add(1) + clientChan := make(chan []byte, h.config.BufferSize) + + h.clientsMu.Lock() + h.clients[clientID] = clientChan + h.clientsMu.Unlock() + + h.sessionsMu.Lock() + h.clientSessions[clientID] = sess.ID + h.sessionsMu.Unlock() + + streamFunc := func(w *bufio.Writer) { + connectCount := h.activeClients.Add(1) + h.logger.Debug("msg", "HTTP client connected", + "component", "http_sink", + "remote_addr", remoteAddrStr, + "session_id", sess.ID, + "client_id", clientID, + "active_clients", connectCount) + + h.wg.Add(1) + + defer func() { + disconnectCount := h.activeClients.Add(-1) + h.logger.Debug("msg", "HTTP client disconnected", + "component", "http_sink", + "remote_addr", remoteAddrStr, + "session_id", sess.ID, + "client_id", clientID, + "active_clients", disconnectCount) + + select { + case h.unregister <- clientID: + case <-h.done: + } + + h.proxy.RemoveSession(sess.ID) + h.wg.Done() + }() + + // Send connected event with metadata + connectionInfo := map[string]any{ + "client_id": fmt.Sprintf("%d", clientID), + "session_id": sess.ID, + "instance_id": h.id, + "stream_path": h.config.StreamPath, + "status_path": h.config.StatusPath, + "buffer_size": h.config.BufferSize, + } + data, _ := json.Marshal(connectionInfo) + fmt.Fprintf(w, "event: connected\ndata: %s\n\n", data) + if err := w.Flush(); err != nil { + return + } + + for { + select { + case payload, ok := <-clientChan: + if !ok { + return + } + + if err := h.writeSSE(w, payload); err != nil { + return + } + + if err := w.Flush(); err != nil { + return + } + + h.proxy.UpdateActivity(sess.ID) + + case <-h.done: + fmt.Fprintf(w, "event: disconnect\ndata: {\"reason\":\"server_shutdown\"}\n\n") + w.Flush() + return + } + } + } + + ctx.SetBodyStreamWriter(streamFunc) +} + +// handleStatus provides a JSON status report +func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) { + ctx.SetContentType("application/json") + + status := map[string]any{ + "service": "LogWisp", + "version": version.Short(), + "instance_id": h.id, + "server": map[string]any{ + "type": "http", + "host": h.config.Host, + "port": h.config.Port, + "active_clients": h.activeClients.Load(), + "buffer_size": h.config.BufferSize, + "uptime_seconds": int(time.Since(h.startTime).Seconds()), + }, + "endpoints": map[string]string{ + "stream": h.config.StreamPath, + "status": h.config.StatusPath, + }, + "statistics": map[string]any{ + "total_processed": h.totalProcessed.Load(), + }, + } + + data, _ := json.Marshal(status) + ctx.SetBody(data) +} + +// writeSSE formats payload into SSE data format +func (h *HTTPSink) writeSSE(w *bufio.Writer, payload []byte) error { + // Handle multi-line payloads per W3C SSE spec + lines := splitLines(payload) + for _, line := range lines { + if _, err := fmt.Fprintf(w, "data: %s\n", line); err != nil { + return err + } + } + // Empty line terminates event + if _, err := w.WriteString("\n"); err != nil { + return err + } + return nil +} + +// splitLines splits payload by newlines, handling different line endings +func splitLines(data []byte) [][]byte { + if len(data) == 0 { + return nil + } + + // Trim trailing newline if present + if data[len(data)-1] == '\n' { + data = data[:len(data)-1] + } + + var lines [][]byte + start := 0 + for i := 0; i < len(data); i++ { + if data[i] == '\n' { + lines = append(lines, data[start:i]) + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + + if len(lines) == 0 { + return [][]byte{data} + } + return lines +} \ No newline at end of file diff --git a/src/internal/sink/tcp/tcp.go b/src/internal/sink/tcp/tcp.go new file mode 100644 index 0000000..13ad7f4 --- /dev/null +++ b/src/internal/sink/tcp/tcp.go @@ -0,0 +1,434 @@ +package tcp + +import ( + "bytes" + "context" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/core" + "logwisp/src/internal/plugin" + "logwisp/src/internal/session" + "logwisp/src/internal/sink" + + lconfig "github.com/lixenwraith/config" + "github.com/lixenwraith/log" + "github.com/lixenwraith/log/compat" + "github.com/panjf2000/gnet/v2" +) + +func init() { + if err := plugin.RegisterSink("tcp", NewTCPSinkPlugin); err != nil { + panic(fmt.Sprintf("failed to register tcp sink: %v", err)) + } +} + +// TCPSink streams log entries to connected TCP clients +type TCPSink struct { + // Plugin identity and session management + id string + proxy *session.Proxy + + // Configuration + config *config.TCPSinkOptions + + // Network + server *tcpServer + engine *gnet.Engine + engineMu sync.Mutex + + // Application + input chan core.TransportEvent + logger *log.Logger + + // Runtime + done chan struct{} + wg sync.WaitGroup + startTime time.Time + + // Statistics + activeConns atomic.Int64 + totalProcessed atomic.Uint64 + lastProcessed atomic.Value // time.Time + + // Error tracking + writeErrors atomic.Uint64 + consecutiveWriteErrors map[gnet.Conn]int + errorMu sync.Mutex +} + +// NewTCPSinkPlugin creates a TCP sink through plugin factory +func NewTCPSinkPlugin( + id string, + configMap map[string]any, + logger *log.Logger, + proxy *session.Proxy, +) (sink.Sink, error) { + // Create config struct with defaults + opts := &config.TCPSinkOptions{ + Host: "0.0.0.0", + Port: 0, // Required + BufferSize: 1000, + WriteTimeout: 5000, + KeepAlive: true, + KeepAlivePeriod: 30000, + } + + // Parse config map into struct + if err := lconfig.ScanMap(configMap, opts); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + // Validate required fields + if opts.Port <= 0 || opts.Port > 65535 { + return nil, fmt.Errorf("port must be between 1 and 65535") + } + if opts.BufferSize <= 0 { + opts.BufferSize = 1000 + } + + t := &TCPSink{ + id: id, + proxy: proxy, + config: opts, + input: make(chan core.TransportEvent, opts.BufferSize), + done: make(chan struct{}), + logger: logger, + consecutiveWriteErrors: make(map[gnet.Conn]int), + } + t.lastProcessed.Store(time.Time{}) + + logger.Info("msg", "TCP sink initialized", + "component", "tcp_sink", + "instance_id", id, + "host", opts.Host, + "port", opts.Port) + + return t, nil +} + +// Capabilities returns supported capabilities +func (t *TCPSink) Capabilities() []core.Capability { + return []core.Capability{ + core.CapSessionAware, + core.CapMultiSession, + } +} + +// Input returns the channel for sending transport events +func (t *TCPSink) Input() chan<- core.TransportEvent { + return t.input +} + +// Start initializes the TCP server and begins the broadcast loop +func (t *TCPSink) Start(ctx context.Context) error { + t.server = &tcpServer{ + sink: t, + clients: make(map[gnet.Conn]*tcpClient), + } + + t.startTime = time.Now() + + // Start broadcast loop + t.wg.Add(1) + go func() { + defer t.wg.Done() + t.broadcastLoop(ctx) + }() + + // Configure gnet + addr := fmt.Sprintf("tcp://%s:%d", t.config.Host, t.config.Port) + gnetLogger := compat.NewGnetAdapter(t.logger) + + opts := []gnet.Option{ + gnet.WithLogger(gnetLogger), + gnet.WithMulticore(true), + gnet.WithReusePort(true), + } + + // Start gnet server + errChan := make(chan error, 1) + go func() { + t.logger.Info("msg", "Starting TCP server", + "component", "tcp_sink", + "host", t.config.Host, + "port", t.config.Port) + + err := gnet.Run(t.server, addr, opts...) + if err != nil { + t.logger.Error("msg", "TCP server failed", + "component", "tcp_sink", + "error", err) + } + errChan <- err + }() + + // Monitor context for shutdown + go func() { + <-ctx.Done() + t.engineMu.Lock() + if t.engine != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + (*t.engine).Stop(shutdownCtx) + } + t.engineMu.Unlock() + }() + + // Wait briefly for server to start or fail + select { + case err := <-errChan: + close(t.done) + t.wg.Wait() + return err + case <-time.After(100 * time.Millisecond): + t.logger.Info("msg", "TCP server started", + "component", "tcp_sink", + "instance_id", t.id, + "port", t.config.Port) + return nil + } +} + +// Stop gracefully shuts down the TCP sink +func (t *TCPSink) Stop() { + t.logger.Info("msg", "Stopping TCP sink", + "component", "tcp_sink", + "instance_id", t.id) + + close(t.done) + + // Stop gnet engine + t.engineMu.Lock() + engine := t.engine + t.engineMu.Unlock() + + if engine != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + (*engine).Stop(ctx) + } + + t.wg.Wait() + + t.logger.Info("msg", "TCP sink stopped", + "component", "tcp_sink", + "instance_id", t.id, + "total_processed", t.totalProcessed.Load()) +} + +// GetStats returns sink statistics +func (t *TCPSink) GetStats() sink.SinkStats { + lastProc, _ := t.lastProcessed.Load().(time.Time) + + return sink.SinkStats{ + ID: t.id, + Type: "tcp", + TotalProcessed: t.totalProcessed.Load(), + ActiveConnections: t.activeConns.Load(), + StartTime: t.startTime, + LastProcessed: lastProc, + Details: map[string]any{ + "host": t.config.Host, + "port": t.config.Port, + "buffer_size": t.config.BufferSize, + "write_errors": t.writeErrors.Load(), + }, + } +} + +// tcpServer implements gnet.EventHandler +type tcpServer struct { + gnet.BuiltinEventEngine + sink *TCPSink + clients map[gnet.Conn]*tcpClient + mu sync.RWMutex +} + +// tcpClient represents a connected TCP client +type tcpClient struct { + conn gnet.Conn + buffer bytes.Buffer + sessionID string +} + +// broadcastLoop sends transport events to all connected clients +func (t *TCPSink) broadcastLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-t.input: + if !ok { + return + } + t.totalProcessed.Add(1) + t.lastProcessed.Store(time.Now()) + t.broadcastData(event.Payload) + case <-t.done: + return + } + } +} + +// OnBoot is called when the server starts +func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action { + s.sink.engineMu.Lock() + s.sink.engine = &eng + s.sink.engineMu.Unlock() + + s.sink.logger.Debug("msg", "TCP server booted", + "component", "tcp_sink", + "instance_id", s.sink.id) + return gnet.None +} + +// OnOpen is called when a new connection is established +func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + remoteAddr := c.RemoteAddr() + remoteAddrStr := remoteAddr.String() + + s.sink.logger.Debug("msg", "TCP connection attempt", + "component", "tcp_sink", + "remote_addr", remoteAddrStr) + + // Reject IPv6 connections + if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { + if tcpAddr.IP.To4() == nil { + s.sink.logger.Warn("msg", "IPv6 connection rejected", + "component", "tcp_sink", + "remote_addr", remoteAddrStr) + return []byte("IPv4-only (IPv6 not supported)\n"), gnet.Close + } + } + + // Create session via proxy + sess := s.sink.proxy.CreateSession(remoteAddrStr, map[string]any{ + "type": "tcp_client", + "remote_addr": remoteAddrStr, + }) + + client := &tcpClient{ + conn: c, + sessionID: sess.ID, + } + + s.mu.Lock() + s.clients[c] = client + s.mu.Unlock() + + newCount := s.sink.activeConns.Add(1) + s.sink.logger.Debug("msg", "TCP connection opened", + "component", "tcp_sink", + "remote_addr", remoteAddrStr, + "session_id", sess.ID, + "active_connections", newCount) + + return nil, gnet.None +} + +// OnClose is called when a connection is closed +func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action { + remoteAddrStr := c.RemoteAddr().String() + + s.mu.RLock() + client, exists := s.clients[c] + s.mu.RUnlock() + + if exists && client.sessionID != "" { + s.sink.proxy.RemoveSession(client.sessionID) + s.sink.logger.Debug("msg", "Session removed", + "component", "tcp_sink", + "session_id", client.sessionID, + "remote_addr", remoteAddrStr) + } + + s.mu.Lock() + delete(s.clients, c) + s.mu.Unlock() + + s.sink.errorMu.Lock() + delete(s.sink.consecutiveWriteErrors, c) + s.sink.errorMu.Unlock() + + newCount := s.sink.activeConns.Add(-1) + s.sink.logger.Debug("msg", "TCP connection closed", + "component", "tcp_sink", + "remote_addr", remoteAddrStr, + "active_connections", newCount, + "error", err) + + return gnet.None +} + +// OnTraffic is called when data is received from a connection +func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action { + s.mu.RLock() + client, exists := s.clients[c] + s.mu.RUnlock() + + // Update session activity + if exists && client.sessionID != "" { + s.sink.proxy.UpdateActivity(client.sessionID) + } + + // TCP sink doesn't expect data from clients, discard + c.Discard(-1) + return gnet.None +} + +// broadcastData sends data to all connected clients +func (t *TCPSink) broadcastData(data []byte) { + t.server.mu.RLock() + defer t.server.mu.RUnlock() + + for conn, client := range t.server.clients { + // Update session activity + if client.sessionID != "" { + t.proxy.UpdateActivity(client.sessionID) + } + + conn.AsyncWrite(data, func(c gnet.Conn, err error) error { + if err != nil { + t.writeErrors.Add(1) + t.handleWriteError(c, err) + } else { + t.errorMu.Lock() + delete(t.consecutiveWriteErrors, c) + t.errorMu.Unlock() + } + return nil + }) + } +} + +// handleWriteError manages errors during async writes +func (t *TCPSink) handleWriteError(c gnet.Conn, err error) { + remoteAddrStr := c.RemoteAddr().String() + + t.errorMu.Lock() + defer t.errorMu.Unlock() + + t.consecutiveWriteErrors[c]++ + errorCount := t.consecutiveWriteErrors[c] + + t.logger.Debug("msg", "AsyncWrite error", + "component", "tcp_sink", + "remote_addr", remoteAddrStr, + "error", err, + "consecutive_errors", errorCount) + + // Close connection after 3 consecutive write errors + if errorCount >= 3 { + t.logger.Warn("msg", "Closing connection due to repeated write errors", + "component", "tcp_sink", + "remote_addr", remoteAddrStr, + "error_count", errorCount) + delete(t.consecutiveWriteErrors, c) + c.Close() + } +} \ No newline at end of file