v0.9.0 restructure for flow architecture, dirty

This commit is contained in:
2025-11-09 15:08:20 -05:00
parent dcf803bac1
commit 22652f9e53
40 changed files with 1104 additions and 1430 deletions

View File

@ -2,7 +2,6 @@
package sink
import (
"bytes"
"context"
"fmt"
"strings"
@ -18,13 +17,18 @@ import (
// ConsoleSink writes log entries to the console (stdout/stderr) using an dedicated logger instance.
type ConsoleSink struct {
config *config.ConsoleSinkOptions
// Configuration
config *config.ConsoleSinkOptions
// Application
input chan core.LogEntry
writer *log.Logger // Dedicated internal logger instance for console writing
writer *log.Logger // dedicated logger for console output
formatter format.Formatter
logger *log.Logger // application logger
// Runtime
done chan struct{}
startTime time.Time
logger *log.Logger // Application logger for app logs
formatter format.Formatter
// Statistics
totalProcessed atomic.Uint64
@ -143,8 +147,7 @@ func (s *ConsoleSink) processLoop(ctx context.Context) {
}
// Convert to string to prevent hex encoding of []byte by log package
// Strip new line, writer adds it
message := string(bytes.TrimSuffix(formatted, []byte{'\n'}))
message := string(formatted)
switch strings.ToUpper(entry.Level) {
case "DEBUG":
s.writer.Debug(message)

View File

@ -2,7 +2,6 @@
package sink
import (
"bytes"
"context"
"fmt"
"sync/atomic"
@ -17,13 +16,18 @@ import (
// FileSink writes log entries to files with rotation.
type FileSink struct {
config *config.FileSinkOptions
// Configuration
config *config.FileSinkOptions
// Application
input chan core.LogEntry
writer *log.Logger // Internal logger instance for file writing
writer *log.Logger // internal logger for file writing
formatter format.Formatter
logger *log.Logger // application logger
// Runtime
done chan struct{}
startTime time.Time
logger *log.Logger // Application logger
formatter format.Formatter
// Statistics
totalProcessed atomic.Uint64
@ -130,8 +134,7 @@ func (fs *FileSink) processLoop(ctx context.Context) {
}
// Convert to string to prevent hex encoding of []byte by log package
// Strip new line, writer adds it
message := string(bytes.TrimSuffix(formatted, []byte{'\n'}))
message := string(formatted)
fs.writer.Message(message)
case <-ctx.Done():

View File

@ -15,7 +15,7 @@ import (
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/limit"
"logwisp/src/internal/network"
"logwisp/src/internal/session"
ltls "logwisp/src/internal/tls"
"logwisp/src/internal/version"
@ -27,36 +27,38 @@ import (
// HTTPSink streams log entries via Server-Sent Events (SSE).
type HTTPSink struct {
// Configuration reference (NOT a copy)
// Configuration
config *config.HTTPSinkOptions
// Runtime
input chan core.LogEntry
server *fasthttp.Server
activeClients atomic.Int64
mu sync.RWMutex
startTime time.Time
done chan struct{}
wg sync.WaitGroup
logger *log.Logger
formatter format.Formatter
// Network
server *fasthttp.Server
netLimiter *network.NetLimiter
// Broker architecture
// Application
input chan core.LogEntry
formatter format.Formatter
logger *log.Logger
// Runtime
mu sync.RWMutex
done chan struct{}
wg sync.WaitGroup
startTime time.Time
// Broker
clients map[uint64]chan core.LogEntry
clientsMu sync.RWMutex
unregister chan uint64
unregister chan uint64 // client unregistration channel
nextClientID atomic.Uint64
// Session and security
// Security & Session
sessionManager *session.Manager
clientSessions map[uint64]string // clientID -> sessionID
sessionsMu sync.RWMutex
tlsManager *ltls.ServerManager
// Net limiting
netLimiter *limit.NetLimiter
// Statistics
activeClients atomic.Int64
totalProcessed atomic.Uint64
lastProcessed atomic.Value // time.Time
}
@ -94,10 +96,10 @@ func NewHTTPSink(opts *config.HTTPSinkOptions, logger *log.Logger, formatter for
}
// Initialize net limiter if configured
if opts.NetLimit != nil && (opts.NetLimit.Enabled ||
len(opts.NetLimit.IPWhitelist) > 0 ||
len(opts.NetLimit.IPBlacklist) > 0) {
h.netLimiter = limit.NewNetLimiter(opts.NetLimit, logger)
if opts.ACL != nil && (opts.ACL.Enabled ||
len(opts.ACL.IPWhitelist) > 0 ||
len(opts.ACL.IPBlacklist) > 0) {
h.netLimiter = network.NewNetLimiter(opts.ACL, logger)
}
return h, nil
@ -111,8 +113,8 @@ func (h *HTTPSink) Input() chan<- core.LogEntry {
// Start initializes the HTTP server and begins the broker loop.
func (h *HTTPSink) Start(ctx context.Context) error {
// Register expiry callback
h.sessionManager.RegisterExpiryCallback("http_sink", func(sessionID, remoteAddr string) {
h.handleSessionExpiry(sessionID, remoteAddr)
h.sessionManager.RegisterExpiryCallback("http_sink", func(sessionID, remoteAddrStr string) {
h.handleSessionExpiry(sessionID, remoteAddrStr)
})
// Start central broker goroutine
@ -183,7 +185,7 @@ func (h *HTTPSink) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
if h.server != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
shutdownCtx, cancel := context.WithTimeout(context.Background(), core.HttpServerShutdownTimeout)
defer cancel()
_ = h.server.ShutdownWithContext(shutdownCtx)
}
@ -193,7 +195,7 @@ func (h *HTTPSink) Start(ctx context.Context) error {
select {
case err := <-errChan:
return err
case <-time.After(100 * time.Millisecond):
case <-time.After(core.HttpServerStartTimeout):
// Server started successfully
return nil
}
@ -431,16 +433,16 @@ func (h *HTTPSink) brokerLoop(ctx context.Context) {
// requestHandler is the main entry point for all incoming HTTP requests.
func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
remoteAddr := ctx.RemoteAddr().String()
remoteAddrStr := ctx.RemoteAddr().String()
// Check net limit
if h.netLimiter != nil {
if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddr); !allowed {
if allowed, statusCode, message := h.netLimiter.CheckHTTP(remoteAddrStr); !allowed {
ctx.SetStatusCode(int(statusCode))
ctx.SetContentType("application/json")
h.logger.Warn("msg", "Net limited",
"component", "http_sink",
"remote_addr", remoteAddr,
"remote_addr", remoteAddrStr,
"status_code", statusCode,
"error", message)
json.NewEncoder(ctx).Encode(map[string]any{
@ -459,7 +461,7 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
}
// Create anonymous session for all connections
sess := h.sessionManager.CreateSession(remoteAddr, "http_sink", map[string]any{
sess := h.sessionManager.CreateSession(remoteAddrStr, "http_sink", map[string]any{
"tls": ctx.IsTLS() || h.tlsManager != nil,
})
@ -478,11 +480,11 @@ func (h *HTTPSink) requestHandler(ctx *fasthttp.RequestCtx) {
// handleStream manages a client's Server-Sent Events (SSE) stream.
func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, sess *session.Session) {
remoteAddrStr := ctx.RemoteAddr().String()
// Track connection for net limiting
remoteAddr := ctx.RemoteAddr().String()
if h.netLimiter != nil {
h.netLimiter.AddConnection(remoteAddr)
defer h.netLimiter.RemoveConnection(remoteAddr)
h.netLimiter.RegisterConnection(remoteAddrStr)
defer h.netLimiter.ReleaseConnection(remoteAddrStr)
}
// Set SSE headers
@ -510,7 +512,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, sess *session.Session)
connectCount := h.activeClients.Add(1)
h.logger.Debug("msg", "HTTP client connected",
"component", "http_sink",
"remote_addr", remoteAddr,
"remote_addr", remoteAddrStr,
"session_id", sess.ID,
"client_id", clientID,
"active_clients", connectCount)
@ -523,7 +525,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx, sess *session.Session)
disconnectCount := h.activeClients.Add(-1)
h.logger.Debug("msg", "HTTP client disconnected",
"component", "http_sink",
"remote_addr", remoteAddr,
"remote_addr", remoteAddrStr,
"session_id", sess.ID,
"client_id", clientID,
"active_clients", disconnectCount)
@ -679,7 +681,7 @@ func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) {
}
// handleSessionExpiry is the callback for cleaning up expired sessions.
func (h *HTTPSink) handleSessionExpiry(sessionID, remoteAddr string) {
func (h *HTTPSink) handleSessionExpiry(sessionID, remoteAddrStr string) {
h.sessionsMu.RLock()
defer h.sessionsMu.RUnlock()
@ -690,7 +692,7 @@ func (h *HTTPSink) handleSessionExpiry(sessionID, remoteAddr string) {
"component", "http_sink",
"session_id", sessionID,
"client_id", clientID,
"remote_addr", remoteAddr)
"remote_addr", remoteAddrStr)
// Signal broker to unregister
select {
@ -733,9 +735,6 @@ func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry core.LogEntry) error
return err
}
// Remove trailing newline if present (SSE adds its own)
formatted = bytes.TrimSuffix(formatted, []byte{'\n'})
// Multi-line content handler
lines := bytes.Split(formatted, []byte{'\n'})
for _, line := range lines {

View File

@ -25,19 +25,30 @@ import (
// TODO: add heartbeat
// HTTPClientSink forwards log entries to a remote HTTP endpoint.
type HTTPClientSink struct {
input chan core.LogEntry
config *config.HTTPClientSinkOptions
client *fasthttp.Client
batch []core.LogEntry
batchMu sync.Mutex
done chan struct{}
wg sync.WaitGroup
startTime time.Time
logger *log.Logger
formatter format.Formatter
// Configuration
config *config.HTTPClientSinkOptions
// Network
client *fasthttp.Client
tlsManager *ltls.ClientManager
// Application
input chan core.LogEntry
formatter format.Formatter
logger *log.Logger
// Runtime
done chan struct{}
wg sync.WaitGroup
startTime time.Time
// Batching
batch []core.LogEntry
batchMu sync.Mutex
// Security & Session
sessionID string
sessionManager *session.Manager
tlsManager *ltls.ClientManager
// Statistics
totalProcessed atomic.Uint64

View File

@ -14,7 +14,7 @@ import (
"logwisp/src/internal/config"
"logwisp/src/internal/core"
"logwisp/src/internal/format"
"logwisp/src/internal/limit"
"logwisp/src/internal/network"
"logwisp/src/internal/session"
"github.com/lixenwraith/log"
@ -24,25 +24,34 @@ import (
// TCPSink streams log entries to connected TCP clients.
type TCPSink struct {
input chan core.LogEntry
config *config.TCPSinkOptions
server *tcpServer
done chan struct{}
activeConns atomic.Int64
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
netLimiter *limit.NetLimiter
logger *log.Logger
formatter format.Formatter
// Configuration
config *config.TCPSinkOptions
// Network
server *tcpServer
engine *gnet.Engine
engineMu sync.Mutex
netLimiter *network.NetLimiter
// Application
input chan core.LogEntry
formatter format.Formatter
logger *log.Logger
// Runtime
done chan struct{}
wg sync.WaitGroup
startTime time.Time
// Security & Session
sessionManager *session.Manager
// Statistics
activeConns atomic.Int64
totalProcessed atomic.Uint64
lastProcessed atomic.Value // time.Time
// Write error tracking
// Error tracking
writeErrors atomic.Uint64
consecutiveWriteErrors map[gnet.Conn]int
errorMu sync.Mutex
@ -54,7 +63,7 @@ type TCPConfig struct {
Port int64
BufferSize int64
Heartbeat *config.HeartbeatConfig
NetLimit *config.NetLimitConfig
ACL *config.ACLConfig
}
// NewTCPSink creates a new TCP streaming sink.
@ -76,10 +85,10 @@ func NewTCPSink(opts *config.TCPSinkOptions, logger *log.Logger, formatter forma
t.lastProcessed.Store(time.Time{})
// Initialize net limiter with pointer
if opts.NetLimit != nil && (opts.NetLimit.Enabled ||
len(opts.NetLimit.IPWhitelist) > 0 ||
len(opts.NetLimit.IPBlacklist) > 0) {
t.netLimiter = limit.NewNetLimiter(opts.NetLimit, logger)
if opts.ACL != nil && (opts.ACL.Enabled ||
len(opts.ACL.IPWhitelist) > 0 ||
len(opts.ACL.IPBlacklist) > 0) {
t.netLimiter = network.NewNetLimiter(opts.ACL, logger)
}
return t, nil
@ -311,7 +320,8 @@ func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action {
// OnOpen is called when a new connection is established.
func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
remoteAddr := c.RemoteAddr()
s.sink.logger.Debug("msg", "TCP connection attempt", "remote_addr", remoteAddr)
remoteAddrStr := remoteAddr.String()
s.sink.logger.Debug("msg", "TCP connection attempt", "remote_addr", remoteAddrStr)
// Reject IPv6 connections
if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok {
@ -322,27 +332,26 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// Check net limit
if s.sink.netLimiter != nil {
remoteStr := c.RemoteAddr().String()
tcpAddr, err := net.ResolveTCPAddr("tcp", remoteStr)
tcpAddr, err := net.ResolveTCPAddr("tcp", remoteAddrStr)
if err != nil {
s.sink.logger.Warn("msg", "Failed to parse TCP address",
"remote_addr", remoteAddr,
"remote_addr", remoteAddrStr,
"error", err)
return nil, gnet.Close
}
if !s.sink.netLimiter.CheckTCP(tcpAddr) {
s.sink.logger.Warn("msg", "TCP connection net limited",
"remote_addr", remoteAddr)
"remote_addr", remoteAddrStr)
return nil, gnet.Close
}
// Track connection
s.sink.netLimiter.AddConnection(remoteStr)
// Register connection post-establishment
s.sink.netLimiter.RegisterConnection(remoteAddrStr)
}
// Create session for tracking
sess := s.sink.sessionManager.CreateSession(c.RemoteAddr().String(), "tcp_sink", nil)
sess := s.sink.sessionManager.CreateSession(remoteAddrStr, "tcp_sink", nil)
// TCP Sink accepts all connections without authentication
client := &tcpClient{
@ -366,7 +375,7 @@ func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// OnClose is called when a connection is closed.
func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
remoteAddr := c.RemoteAddr().String()
remoteAddrStr := c.RemoteAddr().String()
// Get client to retrieve session ID
s.mu.RLock()
@ -379,7 +388,7 @@ func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
s.sink.logger.Debug("msg", "Session removed",
"component", "tcp_sink",
"session_id", client.sessionID,
"remote_addr", remoteAddr)
"remote_addr", remoteAddrStr)
}
// Remove client state
@ -392,14 +401,14 @@ func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action {
delete(s.sink.consecutiveWriteErrors, c)
s.sink.errorMu.Unlock()
// Remove connection tracking
// Release connection
if s.sink.netLimiter != nil {
s.sink.netLimiter.RemoveConnection(remoteAddr)
s.sink.netLimiter.ReleaseConnection(remoteAddrStr)
}
newCount := s.sink.activeConns.Add(-1)
s.sink.logger.Debug("msg", "TCP connection closed",
"remote_addr", remoteAddr,
"remote_addr", remoteAddrStr,
"active_connections", newCount,
"error", err)
return gnet.None
@ -482,6 +491,8 @@ func (t *TCPSink) broadcastData(data []byte) {
// handleWriteError manages errors during async writes, closing faulty connections.
func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
remoteAddrStr := c.RemoteAddr().String()
t.errorMu.Lock()
defer t.errorMu.Unlock()
@ -495,7 +506,7 @@ func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
t.logger.Debug("msg", "AsyncWrite error",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"remote_addr", remoteAddrStr,
"error", err,
"consecutive_errors", errorCount)
@ -503,7 +514,7 @@ func (t *TCPSink) handleWriteError(c gnet.Conn, err error) {
if errorCount >= 3 {
t.logger.Warn("msg", "Closing connection due to repeated write errors",
"component", "tcp_sink",
"remote_addr", c.RemoteAddr(),
"remote_addr", remoteAddrStr,
"error_count", errorCount)
delete(t.consecutiveWriteErrors, c)
c.Close()
@ -539,7 +550,7 @@ func (t *TCPSink) cleanupStaleConnections(staleConns []gnet.Conn) {
for _, conn := range staleConns {
t.logger.Info("msg", "Closing stale connection",
"component", "tcp_sink",
"remote_addr", conn.RemoteAddr())
"remote_addr", conn.RemoteAddr().String())
conn.Close()
}
}

View File

@ -22,24 +22,33 @@ import (
// TODO: add heartbeat
// TCPClientSink forwards log entries to a remote TCP endpoint.
type TCPClientSink struct {
// Configuration
config *config.TCPClientSinkOptions
address string // computed from host:port
// Network
conn net.Conn
connMu sync.RWMutex
// Application
input chan core.LogEntry
config *config.TCPClientSinkOptions
address string
conn net.Conn
connMu sync.RWMutex
formatter format.Formatter
logger *log.Logger
// Runtime
done chan struct{}
wg sync.WaitGroup
startTime time.Time
logger *log.Logger
formatter format.Formatter
// Connection
sessionID string
sessionManager *session.Manager
// Connection state
reconnecting atomic.Bool
lastConnectErr error
connectTime time.Time
// Security & Session
sessionID string
sessionManager *session.Manager
// Statistics
totalProcessed atomic.Uint64
totalFailed atomic.Uint64