v0.4.3 refactor and minor improvements

This commit is contained in:
2025-09-25 03:18:08 -04:00
parent 94b5f3111e
commit 5ea4dc8f5f
11 changed files with 346 additions and 228 deletions

View File

@ -63,7 +63,7 @@ type HTTPConfig struct {
StreamPath string
StatusPath string
Heartbeat *config.HeartbeatConfig
SSL *config.SSLConfig
TLS *config.TLSConfig
NetLimit *config.NetLimitConfig
}
@ -104,29 +104,29 @@ func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Fo
}
}
// Extract SSL config
if ssl, ok := options["ssl"].(map[string]any); ok {
cfg.SSL = &config.SSLConfig{}
cfg.SSL.Enabled, _ = ssl["enabled"].(bool)
if certFile, ok := ssl["cert_file"].(string); ok {
cfg.SSL.CertFile = certFile
// Extract TLS config
if tc, ok := options["tls"].(map[string]any); ok {
cfg.TLS = &config.TLSConfig{}
cfg.TLS.Enabled, _ = tc["enabled"].(bool)
if certFile, ok := tc["cert_file"].(string); ok {
cfg.TLS.CertFile = certFile
}
if keyFile, ok := ssl["key_file"].(string); ok {
cfg.SSL.KeyFile = keyFile
if keyFile, ok := tc["key_file"].(string); ok {
cfg.TLS.KeyFile = keyFile
}
cfg.SSL.ClientAuth, _ = ssl["client_auth"].(bool)
if caFile, ok := ssl["client_ca_file"].(string); ok {
cfg.SSL.ClientCAFile = caFile
cfg.TLS.ClientAuth, _ = tc["client_auth"].(bool)
if caFile, ok := tc["client_ca_file"].(string); ok {
cfg.TLS.ClientCAFile = caFile
}
cfg.SSL.VerifyClientCert, _ = ssl["verify_client_cert"].(bool)
if minVer, ok := ssl["min_version"].(string); ok {
cfg.SSL.MinVersion = minVer
cfg.TLS.VerifyClientCert, _ = tc["verify_client_cert"].(bool)
if minVer, ok := tc["min_version"].(string); ok {
cfg.TLS.MinVersion = minVer
}
if maxVer, ok := ssl["max_version"].(string); ok {
cfg.SSL.MaxVersion = maxVer
if maxVer, ok := tc["max_version"].(string); ok {
cfg.TLS.MaxVersion = maxVer
}
if ciphers, ok := ssl["cipher_suites"].(string); ok {
cfg.SSL.CipherSuites = ciphers
if ciphers, ok := tc["cipher_suites"].(string); ok {
cfg.TLS.CipherSuites = ciphers
}
}
@ -231,7 +231,7 @@ func (h *HTTPSink) Start(ctx context.Context) error {
var err error
if h.tlsManager != nil {
// HTTPS server
err = h.server.ListenAndServeTLS(addr, h.config.SSL.CertFile, h.config.SSL.KeyFile)
err = h.server.ListenAndServeTLS(addr, h.config.TLS.CertFile, h.config.TLS.KeyFile)
} else {
// HTTP server
err = h.server.ListenAndServe(addr)

View File

@ -138,25 +138,25 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter for
cfg.CAFile = caFile
}
// Extract client certificate options from SSL config
if ssl, ok := options["ssl"].(map[string]any); ok {
if enabled, _ := ssl["enabled"].(bool); enabled {
// Extract client certificate options from TLS config
if tc, ok := options["tls"].(map[string]any); ok {
if enabled, _ := tc["enabled"].(bool); enabled {
// Extract client certificate files for mTLS
if certFile, ok := ssl["cert_file"].(string); ok && certFile != "" {
if keyFile, ok := ssl["key_file"].(string); ok && keyFile != "" {
if certFile, ok := tc["cert_file"].(string); ok && certFile != "" {
if keyFile, ok := tc["key_file"].(string); ok && keyFile != "" {
// These will be used below when configuring TLS
cfg.CertFile = certFile // Need to add these fields to HTTPClientConfig
cfg.KeyFile = keyFile
}
}
// Extract CA file from ssl config if not already set
// Extract CA file from TLS config if not already set
if cfg.CAFile == "" {
if caFile, ok := ssl["ca_file"].(string); ok {
if caFile, ok := tc["ca_file"].(string); ok {
cfg.CAFile = caFile
}
}
// Extract insecure skip verify from ssl config
if insecure, ok := ssl["insecure_skip_verify"].(bool); ok {
// Extract insecure skip verify from TLS config
if insecure, ok := tc["insecure_skip_verify"].(bool); ok {
cfg.InsecureSkipVerify = insecure
}
}

View File

@ -61,7 +61,7 @@ type TCPConfig struct {
Port int64
BufferSize int64
Heartbeat *config.HeartbeatConfig
SSL *config.SSLConfig
TLS *config.TLSConfig
NetLimit *config.NetLimitConfig
}
@ -94,29 +94,29 @@ func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.For
}
}
// Extract SSL config
if ssl, ok := options["ssl"].(map[string]any); ok {
cfg.SSL = &config.SSLConfig{}
cfg.SSL.Enabled, _ = ssl["enabled"].(bool)
if certFile, ok := ssl["cert_file"].(string); ok {
cfg.SSL.CertFile = certFile
// Extract TLS config
if tc, ok := options["tls"].(map[string]any); ok {
cfg.TLS = &config.TLSConfig{}
cfg.TLS.Enabled, _ = tc["enabled"].(bool)
if certFile, ok := tc["cert_file"].(string); ok {
cfg.TLS.CertFile = certFile
}
if keyFile, ok := ssl["key_file"].(string); ok {
cfg.SSL.KeyFile = keyFile
if keyFile, ok := tc["key_file"].(string); ok {
cfg.TLS.KeyFile = keyFile
}
cfg.SSL.ClientAuth, _ = ssl["client_auth"].(bool)
if caFile, ok := ssl["client_ca_file"].(string); ok {
cfg.SSL.ClientCAFile = caFile
cfg.TLS.ClientAuth, _ = tc["client_auth"].(bool)
if caFile, ok := tc["client_ca_file"].(string); ok {
cfg.TLS.ClientCAFile = caFile
}
cfg.SSL.VerifyClientCert, _ = ssl["verify_client_cert"].(bool)
if minVer, ok := ssl["min_version"].(string); ok {
cfg.SSL.MinVersion = minVer
cfg.TLS.VerifyClientCert, _ = tc["verify_client_cert"].(bool)
if minVer, ok := tc["min_version"].(string); ok {
cfg.TLS.MinVersion = minVer
}
if maxVer, ok := ssl["max_version"].(string); ok {
cfg.SSL.MaxVersion = maxVer
if maxVer, ok := tc["max_version"].(string); ok {
cfg.TLS.MaxVersion = maxVer
}
if ciphers, ok := ssl["cipher_suites"].(string); ok {
cfg.SSL.CipherSuites = ciphers
if ciphers, ok := tc["cipher_suites"].(string); ok {
cfg.TLS.CipherSuites = ciphers
}
}
@ -627,19 +627,6 @@ func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action {
return gnet.Close
}
// // Check auth timeout
// if !client.authenticated && time.Now().After(client.authTimeout) {
// s.sink.logger.Warn("msg", "Authentication timeout",
// "component", "tcp_sink",
// "remote_addr", c.RemoteAddr().String())
// if client.tlsBridge != nil && client.tlsBridge.IsHandshakeDone() {
// client.tlsBridge.Write([]byte("AUTH TIMEOUT\n"))
// } else if client.tlsBridge == nil {
// c.AsyncWrite([]byte("AUTH TIMEOUT\n"), nil)
// }
// return gnet.Close
// }
// Read all available data
data, err := c.Next(-1)
if err != nil {
@ -801,9 +788,9 @@ func (t *TCPSink) SetAuthConfig(authCfg *config.AuthConfig) {
}
t.authenticator = authenticator
// Initialize TLS manager if SSL is configured
if t.config.SSL != nil && t.config.SSL.Enabled {
tlsManager, err := tls.NewManager(t.config.SSL, t.logger)
// Initialize TLS manager if TLS is configured
if t.config.TLS != nil && t.config.TLS.Enabled {
tlsManager, err := tls.NewManager(t.config.TLS, t.logger)
if err != nil {
t.logger.Error("msg", "Failed to create TLS manager",
"component", "tcp_sink",

View File

@ -66,7 +66,7 @@ type TCPClientConfig struct {
ReconnectBackoff float64
// TLS config
SSL *config.SSLConfig
TLS *config.TLSConfig
}
// NewTCPClientSink creates a new TCP client sink
@ -121,25 +121,25 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
cfg.ReconnectBackoff = backoff
}
// Extract SSL config
if ssl, ok := options["ssl"].(map[string]any); ok {
cfg.SSL = &config.SSLConfig{}
cfg.SSL.Enabled, _ = ssl["enabled"].(bool)
if certFile, ok := ssl["cert_file"].(string); ok {
cfg.SSL.CertFile = certFile
// Extract TLS config
if tc, ok := options["tls"].(map[string]any); ok {
cfg.TLS = &config.TLSConfig{}
cfg.TLS.Enabled, _ = tc["enabled"].(bool)
if certFile, ok := tc["cert_file"].(string); ok {
cfg.TLS.CertFile = certFile
}
if keyFile, ok := ssl["key_file"].(string); ok {
cfg.SSL.KeyFile = keyFile
if keyFile, ok := tc["key_file"].(string); ok {
cfg.TLS.KeyFile = keyFile
}
cfg.SSL.ClientAuth, _ = ssl["client_auth"].(bool)
if caFile, ok := ssl["client_ca_file"].(string); ok {
cfg.SSL.ClientCAFile = caFile
cfg.TLS.ClientAuth, _ = tc["client_auth"].(bool)
if caFile, ok := tc["client_ca_file"].(string); ok {
cfg.TLS.ClientCAFile = caFile
}
if insecure, ok := ssl["insecure_skip_verify"].(bool); ok {
cfg.SSL.InsecureSkipVerify = insecure
if insecure, ok := tc["insecure_skip_verify"].(bool); ok {
cfg.TLS.InsecureSkipVerify = insecure
}
if caFile, ok := ssl["ca_file"].(string); ok {
cfg.SSL.CAFile = caFile
if caFile, ok := tc["ca_file"].(string); ok {
cfg.TLS.CAFile = caFile
}
}
@ -154,11 +154,11 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
t.lastProcessed.Store(time.Time{})
t.connectionUptime.Store(time.Duration(0))
// Initialize TLS manager if SSL is configured
if cfg.SSL != nil && cfg.SSL.Enabled {
// Initialize TLS manager if TLS is configured
if cfg.TLS != nil && cfg.TLS.Enabled {
// Build custom TLS config for client
t.tlsConfig = &tls.Config{
InsecureSkipVerify: cfg.SSL.InsecureSkipVerify,
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
}
// Extract server name from address for SNI
@ -169,36 +169,36 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
t.tlsConfig.ServerName = host
// Load custom CA for server verification
if cfg.SSL.CAFile != "" {
caCert, err := os.ReadFile(cfg.SSL.CAFile)
if cfg.TLS.CAFile != "" {
caCert, err := os.ReadFile(cfg.TLS.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file '%s': %w", cfg.SSL.CAFile, err)
return nil, fmt.Errorf("failed to read CA file '%s': %w", cfg.TLS.CAFile, err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate from '%s'", cfg.SSL.CAFile)
return nil, fmt.Errorf("failed to parse CA certificate from '%s'", cfg.TLS.CAFile)
}
t.tlsConfig.RootCAs = caCertPool
logger.Debug("msg", "Custom CA loaded for server verification",
"component", "tcp_client_sink",
"ca_file", cfg.SSL.CAFile)
"ca_file", cfg.TLS.CAFile)
}
// Load client certificate for mTLS
if cfg.SSL.CertFile != "" && cfg.SSL.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(cfg.SSL.CertFile, cfg.SSL.KeyFile)
if cfg.TLS.CertFile != "" && cfg.TLS.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(cfg.TLS.CertFile, cfg.TLS.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
t.tlsConfig.Certificates = []tls.Certificate{cert}
logger.Info("msg", "Client certificate loaded for mTLS",
"component", "tcp_client_sink",
"cert_file", cfg.SSL.CertFile)
"cert_file", cfg.TLS.CertFile)
}
// Set minimum TLS version if configured
if cfg.SSL.MinVersion != "" {
t.tlsConfig.MinVersion = parseTLSVersion(cfg.SSL.MinVersion, tls.VersionTLS12)
if cfg.TLS.MinVersion != "" {
t.tlsConfig.MinVersion = parseTLSVersion(cfg.TLS.MinVersion, tls.VersionTLS12)
} else {
t.tlsConfig.MinVersion = tls.VersionTLS12 // Default minimum
}
@ -207,8 +207,8 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter form
"component", "tcp_client_sink",
"address", cfg.Address,
"server_name", host,
"insecure", cfg.SSL.InsecureSkipVerify,
"mtls", cfg.SSL.CertFile != "")
"insecure", cfg.TLS.InsecureSkipVerify,
"mtls", cfg.TLS.CertFile != "")
}
return t, nil
}