Files
logwisp/src/internal/transport/tcpstreamer.go

191 lines
4.1 KiB
Go

// FILE: src/internal/transport/tcpstreamer.go
package transport
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"logwisp/src/internal/config"
"logwisp/src/internal/monitor"
"logwisp/src/internal/ratelimit"
"github.com/lixenwraith/log"
"github.com/panjf2000/gnet/v2"
)
type TCPStreamer struct {
logChan chan monitor.LogEntry
config config.TCPConfig
server *tcpServer
done chan struct{}
activeConns atomic.Int32
startTime time.Time
engine *gnet.Engine
engineMu sync.Mutex
wg sync.WaitGroup
rateLimiter *ratelimit.Limiter
logger *log.Logger
}
func NewTCPStreamer(logChan chan monitor.LogEntry, cfg config.TCPConfig, logger *log.Logger) *TCPStreamer {
t := &TCPStreamer{
logChan: logChan,
config: cfg,
done: make(chan struct{}),
startTime: time.Now(),
logger: logger,
}
if cfg.RateLimit != nil && cfg.RateLimit.Enabled {
t.rateLimiter = ratelimit.New(*cfg.RateLimit)
}
return t
}
func (t *TCPStreamer) Start() error {
t.server = &tcpServer{streamer: t}
// Start log broadcast loop
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.broadcastLoop()
}()
// Configure gnet
addr := fmt.Sprintf("tcp://:%d", t.config.Port)
// Run gnet in separate goroutine to avoid blocking
errChan := make(chan error, 1)
go func() {
t.logger.Info("msg", "Starting TCP server",
"component", "tcp_streamer",
"port", t.config.Port)
err := gnet.Run(t.server, addr,
gnet.WithLogger(noopLogger{}),
gnet.WithMulticore(true),
gnet.WithReusePort(true),
)
if err != nil {
t.logger.Error("msg", "TCP server failed",
"component", "tcp_streamer",
"port", t.config.Port,
"error", err)
}
errChan <- err
}()
// Wait briefly for server to start or fail
select {
case err := <-errChan:
// Server failed immediately
close(t.done)
t.wg.Wait()
return err
case <-time.After(100 * time.Millisecond):
// Server started successfully
t.logger.Info("msg", "TCP server started", "port", t.config.Port)
return nil
}
}
func (t *TCPStreamer) Stop() {
t.logger.Info("msg", "Stopping TCP server")
// Signal broadcast loop to stop
close(t.done)
// Stop gnet engine if running
t.engineMu.Lock()
engine := t.engine
t.engineMu.Unlock()
if engine != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
(*engine).Stop(ctx) // Dereference the pointer
}
// Wait for broadcast loop to finish
t.wg.Wait()
t.logger.Info("msg", "TCP server stopped")
}
func (t *TCPStreamer) broadcastLoop() {
var ticker *time.Ticker
var tickerChan <-chan time.Time
if t.config.Heartbeat.Enabled {
ticker = time.NewTicker(time.Duration(t.config.Heartbeat.IntervalSeconds) * time.Second)
tickerChan = ticker.C
defer ticker.Stop()
}
for {
select {
case entry, ok := <-t.logChan:
if !ok {
return
}
data, err := json.Marshal(entry)
if err != nil {
t.logger.Error("msg", "Failed to marshal log entry",
"component", "tcp_streamer",
"error", err,
"entry_source", entry.Source)
continue
}
data = append(data, '\n')
t.server.connections.Range(func(key, value any) bool {
conn := key.(gnet.Conn)
conn.AsyncWrite(data, nil)
return true
})
case <-tickerChan:
if heartbeat := t.formatHeartbeat(); heartbeat != nil {
t.server.connections.Range(func(key, value any) bool {
conn := key.(gnet.Conn)
conn.AsyncWrite(heartbeat, nil)
return true
})
}
case <-t.done:
return
}
}
}
func (t *TCPStreamer) formatHeartbeat() []byte {
if !t.config.Heartbeat.Enabled {
return nil
}
data := make(map[string]any)
data["type"] = "heartbeat"
if t.config.Heartbeat.IncludeTimestamp {
data["time"] = time.Now().UTC().Format(time.RFC3339Nano)
}
if t.config.Heartbeat.IncludeStats {
data["active_connections"] = t.activeConns.Load()
data["uptime_seconds"] = int(time.Since(t.startTime).Seconds())
}
// For TCP, always use JSON format
jsonData, _ := json.Marshal(data)
return append(jsonData, '\n')
}
func (t *TCPStreamer) GetActiveConnections() int32 {
return t.activeConns.Load()
}