MAJOR rework
- improves a lot on the Go side including better config and logging libraries (including log rotation), better internal package distribution, and new a3go functionality to make data transfer more performant - SQF side preprocessing of capture data is now minimal - arrays in hashmap format are sent directly to the extension and parsed there to minimize game impact - CBA custom events are implemented in a better fashion - README update - license change - with performance improvements, the deep customization of integrated metric gathering is removed in return to a single refreshRateMs, defining the interval at which core metrics are captured - peeled back the list of core metrics to the core information used in troubleshooting and benchmarking
This commit is contained in:
123
extension/internal/influx/influx.go
Normal file
123
extension/internal/influx/influx.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package influx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/indig0fox/IFXMetrics/internal/logger"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
"github.com/influxdata/influxdb-client-go/v2/api"
|
||||
"github.com/influxdata/influxdb-client-go/v2/domain"
|
||||
"github.com/kataras/iris/v12/x/errors"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var InfluxClient influxdb2.Client
|
||||
var Connected bool = false
|
||||
var activeSettings *viper.Viper
|
||||
|
||||
var writeAPIs = map[string]api.WriteAPI{}
|
||||
var writeAPIErrorChannels = map[string]<-chan error{}
|
||||
|
||||
func Setup(
|
||||
settings *viper.Viper,
|
||||
) error {
|
||||
activeSettings = settings
|
||||
// create influx client
|
||||
InfluxClient = influxdb2.NewClientWithOptions(
|
||||
activeSettings.GetString("influxdb.host"),
|
||||
activeSettings.GetString("influxdb.token"),
|
||||
influxdb2.DefaultOptions().SetFlushInterval(1000),
|
||||
)
|
||||
|
||||
_, err := InfluxClient.Ping(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Connected = true
|
||||
|
||||
// start error handler
|
||||
go func() {
|
||||
for {
|
||||
for bucket, err := range writeAPIErrorChannels {
|
||||
thisLog := logger.FileOnly.With().
|
||||
Str("component", "influx").
|
||||
Str("bucket", bucket).
|
||||
Logger()
|
||||
|
||||
select {
|
||||
case e := <-err:
|
||||
thisLog.Error().Msg(e.Error())
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func WriteLine(
|
||||
bucket string,
|
||||
measurement string,
|
||||
tags map[string]string,
|
||||
fields map[string]interface{},
|
||||
) error {
|
||||
|
||||
if !Connected {
|
||||
return errors.New("influxdb not connected")
|
||||
}
|
||||
|
||||
if InfluxClient == nil {
|
||||
return errors.New("influxdb client not initialized")
|
||||
}
|
||||
|
||||
// check if bucket exists
|
||||
b := InfluxClient.BucketsAPI()
|
||||
_, err := b.FindBucketByName(context.Background(), bucket)
|
||||
if err != nil {
|
||||
// get organization
|
||||
org, err := InfluxClient.OrganizationsAPI().FindOrganizationByName(
|
||||
context.Background(),
|
||||
activeSettings.GetString("influxdb.org"),
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
// create bucket
|
||||
bucket, err := b.CreateBucketWithName(
|
||||
context.Background(),
|
||||
org,
|
||||
bucket,
|
||||
domain.RetentionRule{EverySeconds: 0},
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
logger.Log.Info().Msgf("created bucket %s", bucket.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
if writeAPIs[bucket] == nil {
|
||||
writeAPIs[bucket] = InfluxClient.WriteAPI(
|
||||
activeSettings.GetString("influxdb.org"),
|
||||
bucket,
|
||||
)
|
||||
writeAPIErrorChannels[bucket] = writeAPIs[bucket].Errors()
|
||||
}
|
||||
|
||||
p := influxdb2.NewPoint(
|
||||
measurement,
|
||||
tags,
|
||||
fields,
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
writeAPIs[bucket].WritePoint(p)
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user