//go:generate ../../../tools/readme_config_includer/generator
package kafka

import (
	"bytes"
	_ "embed"
	"errors"
	"fmt"
	"slices"
	"strings"
	"text/template"
	"time"

	"github.com/IBM/sarama"
	"github.com/gofrs/uuid/v5"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/plugins/common/kafka"
	"github.com/influxdata/telegraf/plugins/common/proxy"
	"github.com/influxdata/telegraf/plugins/outputs"
)

//go:embed sample.conf
var sampleConfig string

var zeroTime = time.Unix(0, 0)

type Kafka struct {
	Brokers           []string          `toml:"brokers"`
	Topic             string            `toml:"topic"`
	TopicTag          string            `toml:"topic_tag"`
	ExcludeTopicTag   bool              `toml:"exclude_topic_tag"`
	TopicSuffix       TopicSuffix       `toml:"topic_suffix"`
	RoutingTag        string            `toml:"routing_tag"`
	RoutingKey        string            `toml:"routing_key"`
	ProducerTimestamp string            `toml:"producer_timestamp"`
	MetricNameHeader  string            `toml:"metric_name_header" deprecated:"1.39.0;1.45.0;please use 'headers' instead"`
	Headers           map[string]string `toml:"headers"`
	Log               telegraf.Logger   `toml:"-"`
	proxy.Socks5ProxyConfig
	kafka.WriteConfig

	// Legacy TLS config options
	Certificate string `toml:"certificate" deprecated:"1.36.0;1.40.0;please use 'tls_cert' instead"`
	Key         string `toml:"key" deprecated:"1.36.0;1.40.0;please use 'tls_cert' instead"`
	CA          string `toml:"ca" deprecated:"1.36.0;1.40.0;please use 'tls_ca' instead"`

	saramaConfig *sarama.Config
	producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
	producer     sarama.SyncProducer
	headerTmpl   map[string]*template.Template

	serializer telegraf.Serializer
}

type TopicSuffix struct {
	Method    string   `toml:"method"`
	Keys      []string `toml:"keys"`
	Separator string   `toml:"separator"`
}

func (*Kafka) SampleConfig() string {
	return sampleConfig
}

func (k *Kafka) SetSerializer(serializer telegraf.Serializer) {
	k.serializer = serializer
}

func (k *Kafka) Init() error {
	kafka.SetLogger(k.Log.Level())

	// Validate the topic-suffix method
	switch k.TopicSuffix.Method {
	case "", "measurement", "tags":
		// Do nothing, those are valid
	default:
		return fmt.Errorf("unknown topic suffix method provided: %s", k.TopicSuffix.Method)
	}

	// Legacy support ssl config
	if k.Certificate != "" {
		k.TLSCert = k.Certificate
		k.TLSCA = k.CA
		k.TLSKey = k.Key
	}

	// Legacy support for metric_name_header
	if k.MetricNameHeader != "" {
		if k.Headers == nil {
			k.Headers = make(map[string]string, 1)
		}
		k.Headers[k.MetricNameHeader] = "{{ .Name }}"
	}

	// Create new configuration
	config := sarama.NewConfig()
	if err := k.SetConfig(config, k.Log); err != nil {
		return err
	}

	if k.Socks5ProxyEnabled {
		config.Net.Proxy.Enable = true

		dialer, err := k.Socks5ProxyConfig.GetDialer()
		if err != nil {
			return fmt.Errorf("connecting to proxy server failed: %w", err)
		}
		config.Net.Proxy.Dialer = dialer
	}
	k.saramaConfig = config

	switch k.ProducerTimestamp {
	case "":
		k.ProducerTimestamp = "metric"
	case "metric", "now":
	default:
		return fmt.Errorf("unknown producer_timestamp option: %s", k.ProducerTimestamp)
	}

	// Setup header templates
	k.headerTmpl = make(map[string]*template.Template, len(k.Headers))
	for name, expr := range k.Headers {
		tmpl, err := template.New(name).Parse(expr)
		if err != nil {
			return fmt.Errorf("creating template for header %q failed: %w", name, err)
		}
		k.headerTmpl[name] = tmpl
	}

	return nil
}

func (k *Kafka) Connect() error {
	producer, err := k.producerFunc(k.Brokers, k.saramaConfig)
	if err != nil {
		return &internal.StartupError{Err: err, Retry: true}
	}
	k.producer = producer
	return nil
}

func (k *Kafka) Close() error {
	if k.producer == nil {
		return nil
	}
	return k.producer.Close()
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
	msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
	for _, metric := range metrics {
		metric, topic := k.getTopicName(metric)

		buf, err := k.serializer.Serialize(metric)
		if err != nil {
			k.Log.Debugf("Could not serialize metric: %v", err)
			continue
		}

		m := &sarama.ProducerMessage{
			Topic:   topic,
			Value:   sarama.ByteEncoder(buf),
			Headers: make([]sarama.RecordHeader, 0, len(k.headerTmpl)),
		}

		// Set the message headers
		var headerValue bytes.Buffer
		for name, tmpl := range k.headerTmpl {
			headerValue.Reset()
			if err := tmpl.Execute(&headerValue, metric); err != nil {
				k.Log.Errorf("adding header %q failed: %v", name, err)
				continue
			}
			m.Headers = append(m.Headers, sarama.RecordHeader{
				Key:   []byte(name),
				Value: slices.Clone(headerValue.Bytes()),
			})
		}

		// Negative timestamps are not allowed by the Kafka protocol.
		if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) {
			m.Timestamp = metric.Time()
		}

		// Add the routing key if configured
		key, err := k.routingKey(metric)
		if err != nil {
			return fmt.Errorf("could not generate routing key: %w", err)
		}
		if key != "" {
			m.Key = sarama.StringEncoder(key)
		}

		msgs = append(msgs, m)
	}

	if err := k.producer.SendMessages(msgs); err != nil {
		// We could have many errors, return only the first encountered.
		var errs sarama.ProducerErrors
		if errors.As(err, &errs) && len(errs) > 0 {
			// Just return the first error encountered
			firstErr := errs[0]
			if errors.Is(firstErr.Err, sarama.ErrMessageSizeTooLarge) {
				k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
				return nil
			}
			if errors.Is(firstErr.Err, sarama.ErrInvalidTimestamp) {
				k.Log.Error(
					"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
						"dropping batch",
				)
				return nil
			}
			return firstErr
		}
		return err
	}

	return nil
}

func (k *Kafka) getTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
	topic := k.Topic
	if k.TopicTag != "" {
		if t, ok := metric.GetTag(k.TopicTag); ok {
			topic = t

			// If excluding the topic tag, a copy is required to avoid modifying
			// the metric buffer.
			if k.ExcludeTopicTag {
				metric = metric.Copy()
				metric.Accept()
				metric.RemoveTag(k.TopicTag)
			}
		}
	}

	var topicName string
	switch k.TopicSuffix.Method {
	case "measurement":
		topicName = topic + k.TopicSuffix.Separator + metric.Name()
	case "tags":
		topicNameComponents := []string{topic}
		for _, tag := range k.TopicSuffix.Keys {
			tagValue := metric.Tags()[tag]
			if tagValue != "" {
				topicNameComponents = append(topicNameComponents, tagValue)
			}
		}
		topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
	default:
		topicName = topic
	}
	return metric, topicName
}

func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) {
	if k.RoutingTag != "" {
		key, ok := metric.GetTag(k.RoutingTag)
		if ok {
			return key, nil
		}
	}

	if k.RoutingKey == "random" {
		u, err := uuid.NewV4()
		if err != nil {
			return "", err
		}
		return u.String(), nil
	}

	return k.RoutingKey, nil
}

func init() {
	outputs.Add("kafka", func() telegraf.Output {
		return &Kafka{
			WriteConfig: kafka.WriteConfig{
				MaxRetry:     3,
				RequiredAcks: -1,
			},
			producerFunc: sarama.NewSyncProducer,
		}
	})
}
