start postgres support for extended data

This commit is contained in:
2023-04-13 12:42:31 -07:00
parent bd9e9f37aa
commit d0802de7ae
7 changed files with 552 additions and 73 deletions

404
arma.go
View File

@@ -9,22 +9,41 @@ package main
import "C" // This is required to import the C code
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"reflect"
"strings"
"time"
"unsafe"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
// declare list of functions available for call
var AVAILABLE_FUNCTIONS = map[string]interface{}{
"loadSettings": loadSettings,
"connectToInflux": connectToInflux,
"writeToInflux": writeToInflux,
"connectToTimescale": connectToTimescale,
"initTimescale": initTimescale,
"writeToTimescale": writeToTimescale,
"deinitialize": deinitialize,
"getDir": getDir,
"sanitizeLineProtocol": sanitizeLineProtocol,
}
var EXTENSION_VERSION string = "0.0.1"
var extensionCallbackFnc C.extensionCallback
var influxConnectionSettings influxSettings
var a3Settings arma3Settings
var timescaleConnectionSettings timescaleSettings
var timescaleDbPool *pgxpool.Pool
// InfluxDB variables
var DB_CLIENT influxdb2.Client
@@ -59,16 +78,21 @@ type influxSettings struct {
Org string `json:"org"`
}
type timescaleSettings struct {
ConnectionUrl string `json:"connectionUrl"`
}
type arma3Settings struct {
RefreshRateMs int `json:"refreshRateMs"`
}
type settingsJson struct {
Influx influxSettings `json:"influxdb"`
Arma3 arma3Settings `json:"arma3"`
Influx influxSettings `json:"influxdb"`
Arma3 arma3Settings `json:"arma3"`
Timescale timescaleSettings `json:"timescaledb"`
}
func connectToInflux() string {
func connectToInflux() {
if influxConnectionSettings.Host == "" {
logLine("connectToInflux", `["influxConnectionSettings.Host is empty", "ERROR"]`)
// logLine("connectToInflux", `["Creating backup file", "INFO"]`)
@@ -83,18 +107,267 @@ func connectToInflux() string {
// logLine("connectToInflux", `["Error creating gzip writer", "ERROR"]`)
// }
// return "Error connecting to Influx. Using local backup"
return "Error connecting to Influx."
}
DB_CLIENT = influxdb2.NewClientWithOptions(influxConnectionSettings.Host, influxConnectionSettings.Token, influxdb2.DefaultOptions().SetBatchSize(500).SetFlushInterval(2000))
logLine("connectToInflux", `["DB_CLIENT created", "INFO"]`)
return "Connected to Influx successfully"
}
//////////////////////////////////
// TIMESCALE
//////////////////////////////////
func connectToTimescale() {
functionName := "connectToTimescale"
var err error
// urlExample := "postgres://username:password@localhost:5432/database_name"
// logLine("connectToTimescale", fmt.Sprintf(`["timescaleConnectionSettings.ConnectionUrl: %s", "INFO"]`, timescaleConnectionSettings.ConnectionUrl))
conn, err := pgx.Connect(context.Background(), timescaleConnectionSettings.ConnectionUrl)
if err != nil {
logLine(
functionName,
fmt.Sprintf(`["Unable to connect to Timescale DB: %v", "ERROR"]`, err.Error()),
)
// return fmt.Sprintf(`["Error connecting to timescale DB: %v\n"]`, err.Error())
}
conn.Exec(context.Background(), "CREATE DATABASE rangermetrics;")
logLine("connectToTimescale", `["Connected to Timescale successfully", "INFO"]`)
}
func initTimescale() {
// create a table
// CREATE TABLE IF NOT EXISTS rangermetrics (time timestamp, line text);
timescaleDbPool.Exec(context.Background(), "CREATE DATABASE rangermetrics;")
// create entities table
timescaleDbPool.Exec(context.Background(), `CREATE TABLE "public.Missions" (
"id" serial NOT NULL UNIQUE,
"world_name" VARCHAR(255) NOT NULL,
"briefing_name" VARCHAR(255) NOT NULL,
"mission_name" VARCHAR(255) NOT NULL,
"mission_author" VARCHAR(255) NOT NULL,
"server_name" VARCHAR(255) NOT NULL,
"server_mods" TEXT,
"ace_medical" BOOLEAN NOT NULL,
"radio_tfar" BOOLEAN NOT NULL,
"radio_acre" BOOLEAN NOT NULL,
"start_game" DATETIME NOT NULL,
"start_utc" DATETIME NOT NULL,
"frame_count" FLOAT NOT NULL,
"capture_delay_s" FLOAT NOT NULL,
"addon_ver_major" integer NOT NULL,
"addon_ver_minor" integer NOT NULL,
"addon_ver_patch" integer NOT NULL,
"extension_ver_major" integer NOT NULL,
"extension_ver_minor" integer NOT NULL,
"extension_ver_patch" integer NOT NULL,
"tags" VARCHAR(255) NOT NULL,
CONSTRAINT "Missions_pk" PRIMARY KEY ("id")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Worlds" (
"world_name" VARCHAR(255) NOT NULL,
"display_name" VARCHAR(255) NOT NULL,
"world_size_m" integer NOT NULL,
CONSTRAINT "Worlds_pk" PRIMARY KEY ("world_name")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Units" (
"mission_id" integer NOT NULL,
"unit_id" integer NOT NULL,
"frame" integer NOT NULL,
"steamid64" varchar(100),
"steam_name" VARCHAR(255) NOT NULL,
"a3_profile_name" VARCHAR(255) NOT NULL,
"is_human" BOOLEAN NOT NULL,
"is_afk" BOOLEAN NOT NULL,
"is_alive" BOOLEAN NOT NULL,
"unit_type" VARCHAR(255) NOT NULL,
"role_description" VARCHAR(255),
"side" integer NOT NULL,
"group_id" varchar(100) NOT NULL,
"name" VARCHAR(255),
"position" VARCHAR(255) NOT NULL,
"direction" FLOAT NOT NULL,
"anim_state" varchar(100),
"stance" VARCHAR(255),
"traits" VARCHAR(255),
"damage" FLOAT NOT NULL,
"is_speaking" integer NOT NULL,
CONSTRAINT "Units_pk" PRIMARY KEY ("mission_id","unit_id","frame")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Vehicles" (
"mission_id" integer NOT NULL,
"frame" integer NOT NULL,
"vehicle_id" integer NOT NULL,
"object_type" VARCHAR(255),
"weapons" varchar(3000),
"customization" varchar(1000),
"position" VARCHAR(255) NOT NULL,
"direction" FLOAT NOT NULL,
"vector_dir" varchar(70) NOT NULL,
"vector_up" varchar(70) NOT NULL,
"is_alive" BOOLEAN NOT NULL,
"damage" FLOAT NOT NULL,
CONSTRAINT "Vehicles_pk" PRIMARY KEY ("mission_id","frame","vehicle_id")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Markers" (
"mission_id" integer NOT NULL,
"frame" integer NOT NULL,
"marker_name" VARCHAR(255) NOT NULL,
"data" VARCHAR(255) NOT NULL,
CONSTRAINT "Markers_pk" PRIMARY KEY ("mission_id","frame","marker_name")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Chat" (
"id" serial NOT NULL,
"mission_id" integer NOT NULL,
"sender_id" integer NOT NULL,
"frame" integer NOT NULL,
"timestamp_utc" TIMESTAMP NOT NULL,
"content" VARCHAR(255) NOT NULL,
"channel" integer NOT NULL,
CONSTRAINT "Chat_pk" PRIMARY KEY ("id")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.UniqueUsers" (
"steamid64" varchar(100) NOT NULL,
CONSTRAINT "UniqueUsers_pk" PRIMARY KEY ("steamid64")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Environment" (
"mission_id" integer NOT NULL,
"frame" integer NOT NULL,
"date_game" DATETIME NOT NULL,
"date_utc" DATETIME NOT NULL,
"overcast" FLOAT NOT NULL,
"rain" FLOAT NOT NULL,
"humidity" FLOAT NOT NULL,
"fog_value" FLOAT NOT NULL,
"fog_decay" FLOAT NOT NULL,
"fog_base" FLOAT NOT NULL,
"wind_vector" VARCHAR(255) NOT NULL,
"gusts" FLOAT NOT NULL,
"waves" FLOAT NOT NULL,
CONSTRAINT "Environment_pk" PRIMARY KEY ("mission_id","frame")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.StaticObjects" (
"mission_id" integer NOT NULL,
"frame" integer NOT NULL,
"building_id" integer NOT NULL,
"position" integer NOT NULL,
"direction" FLOAT NOT NULL,
"vector_dir" varchar(70) NOT NULL,
"vector_up" varchar(70) NOT NULL,
"object_type" VARCHAR(255) NOT NULL,
"classname" VARCHAR(255) NOT NULL,
"simple_object_data" varchar(1500) NOT NULL,
CONSTRAINT "StaticObjects_pk" PRIMARY KEY ("mission_id","frame","building_id")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Campaigns" (
"id" serial NOT NULL,
"name" VARCHAR(255) NOT NULL,
"description" TEXT NOT NULL,
"image" bytea NOT NULL,
CONSTRAINT "Campaigns_pk" PRIMARY KEY ("id")
) WITH (
OIDS=FALSE
);
CREATE TABLE "public.Missions_In_Campaigns" (
"mission" integer NOT NULL,
"campaign" integer NOT NULL,
CONSTRAINT "Missions_In_Campaigns_pk" PRIMARY KEY ("mission","campaign")
) WITH (
OIDS=FALSE
);
ALTER TABLE "Missions" ADD CONSTRAINT "Missions_fk0" FOREIGN KEY ("world_name") REFERENCES "Worlds"("world_name");
ALTER TABLE "Units" ADD CONSTRAINT "Units_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "Units" ADD CONSTRAINT "Units_fk1" FOREIGN KEY ("steamid64") REFERENCES "UniqueUsers"("steamid64");
ALTER TABLE "Vehicles" ADD CONSTRAINT "Vehicles_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "Markers" ADD CONSTRAINT "Markers_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "Chat" ADD CONSTRAINT "Chat_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "Chat" ADD CONSTRAINT "Chat_fk1" FOREIGN KEY ("sender_id") REFERENCES "Units"("unit_id");
ALTER TABLE "Environment" ADD CONSTRAINT "Environment_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "StaticObjects" ADD CONSTRAINT "StaticObjects_fk0" FOREIGN KEY ("mission_id") REFERENCES "Missions"("id");
ALTER TABLE "Missions_In_Campaigns" ADD CONSTRAINT "Missions_In_Campaigns_fk0" FOREIGN KEY ("mission") REFERENCES "Missions"("id");
ALTER TABLE "Missions_In_Campaigns" ADD CONSTRAINT "Missions_In_Campaigns_fk1" FOREIGN KEY ("campaign") REFERENCES "Campaigns"("id");
`)
}
func writeToTimescale(table string, line string) {
// logLine("writeToTimescale", fmt.Sprintf(`["line: %s", "INFO"]`, line))
functionName := "writeToTimescale"
_, err := timescaleDbPool.Exec(context.Background(), "INSERT INTO %1 (time, line) VALUES (NOW(), $2);", table, line)
if err != nil {
logLine(functionName, fmt.Sprintf(`["Error writing to timescale: %v", "ERROR"]`, err.Error()))
}
}
func deinitialize() {
logLine("deinitialize", `["deinitialize called", "INFO"]`)
DB_CLIENT.Close()
timescaleDbPool.Close()
}
// sanitize line protocol for influx
@@ -119,10 +392,11 @@ func getDir() string {
return dir
}
func loadSettings() (dir string, result string, host string) {
logLine("loadSettings", fmt.Sprintf(`["ADDON_FOLDER: %s", "INFO"]`, ADDON_FOLDER))
logLine("loadSettings", fmt.Sprintf(`["LOG_FILE: %s", "INFO"]`, LOG_FILE))
logLine("loadSettings", fmt.Sprintf(`["SETTINGS_FILE: %s", "INFO"]`, SETTINGS_FILE))
func loadSettings() (dir string, result string, influxHost string, timescaleUrl string) {
functionName := "loadSettings"
logLine(functionName, fmt.Sprintf(`["ADDON_FOLDER: %s", "INFO"]`, ADDON_FOLDER))
logLine(functionName, fmt.Sprintf(`["LOG_FILE: %s", "INFO"]`, LOG_FILE))
logLine(functionName, fmt.Sprintf(`["SETTINGS_FILE: %s", "INFO"]`, SETTINGS_FILE))
// print the current working directory
var file *os.File
@@ -148,9 +422,13 @@ func loadSettings() (dir string, result string, host string) {
a3Set := arma3Settings{
RefreshRateMs: 1000,
}
tsSettings := timescaleSettings{
ConnectionUrl: "postgres://username:password@localhost:5432/database_name",
}
defaultSettings := map[string]interface{}{
"influxdb": ifSet,
"arma3": a3Set,
"influxdb": ifSet,
"arma3": a3Set,
"timescale": tsSettings,
}
encoder := json.NewEncoder(file)
err = encoder.Encode(defaultSettings)
@@ -158,8 +436,9 @@ func loadSettings() (dir string, result string, host string) {
log.Fatal(err)
}
result = `["settings.json created - please modify!", "WARN"]`
host = ifSet.Host
return dir, result, host
influxHost = ifSet.Host
timescaleUrl = tsSettings.ConnectionUrl
return dir, result, influxHost, timescaleUrl
} else {
// file exists
log.Println("settings.json exists")
@@ -178,13 +457,15 @@ func loadSettings() (dir string, result string, host string) {
// set the settings
influxConnectionSettings = settings.Influx
a3Settings = settings.Arma3
timescaleConnectionSettings = settings.Timescale
// set the result
result = `["settings.json loaded", "INFO"]`
host = influxConnectionSettings.Host
influxHost = influxConnectionSettings.Host
timescaleUrl = timescaleConnectionSettings.ConnectionUrl
}
return dir, result, host
return dir, result, influxHost, timescaleUrl
}
func runExtensionCallback(name *C.char, function *C.char, data *C.char) C.int {
@@ -217,7 +498,7 @@ func goRVExtensionArgs(output *C.char, outputsize C.size_t, input *C.char, argv
if C.GoString(input) == "sendToInflux" {
// start a goroutine to send the data to influx
// param string is argv[0] which is the data to send to influx
go sendToInflux(&out)
go writeToInflux(&out)
temp = fmt.Sprintf("Function: %s nb params: %d", C.GoString(input), argc)
}
@@ -261,17 +542,17 @@ func fixEscapeQuotes(s string) string {
return strings.Replace(s, `""`, `"`, -1)
}
func sendToInflux(a3DataRaw *[]string) string {
func writeToInflux(a3DataRaw *[]string) string {
// convert to string array
a3Data := *a3DataRaw
logLine("sendToInflux", fmt.Sprintf(`["Received %d params", "DEBUG"]`, len(a3Data)))
logLine("writeToInflux", fmt.Sprintf(`["Received %d params", "DEBUG"]`, len(a3Data)))
MIN_PARAMS_COUNT := 1
var logData string
functionName := "sendToInflux"
functionName := "writeToInflux"
if len(a3Data) < MIN_PARAMS_COUNT {
logData = fmt.Sprintf(`["Not all parameters present (got %d, expected at least %d)", "ERROR"]`, len(a3Data), MIN_PARAMS_COUNT)
@@ -341,43 +622,58 @@ func goRVExtension(output *C.char, outputsize C.size_t, input *C.char) {
var temp string
switch C.GoString(input) {
case "version":
logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
temp = EXTENSION_VERSION
case "getDir":
logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
temp = getDir()
case "loadSettings":
logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
cwd, result, influxHost := loadSettings()
log.Println("CWD:", cwd)
log.Println("RESULT:", result)
log.Println("INFLUX HOST:", influxHost)
if result != "" {
logLine("goRVExtension", result)
temp = fmt.Sprintf(
`["%s", "%s", "%s", "%d"]`,
EXTENSION_VERSION,
influxConnectionSettings.Host,
influxConnectionSettings.Org,
a3Settings.RefreshRateMs,
)
}
case "connectToInflux":
logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
result := connectToInflux()
temp = fmt.Sprintf(`["%s", "INFO"]`, result)
case "getUnixTimeNano":
temp = fmt.Sprintf(`["%d", "INFO"]`, getUnixTimeNano())
case "deinitialize":
logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
deinitialize()
temp = `["Deinitializing", "INFO"]`
default:
temp = fmt.Sprintf(`["Unknown command: %s", "ERR"]`, C.GoString(input))
// check if input is in AVAILABLE_FUNCTIONS
// if not, return error
// if yes, continue
if _, ok := AVAILABLE_FUNCTIONS[C.GoString(input)]; !ok {
temp = fmt.Sprintf("Function: %s not found!", C.GoString(input))
} else {
// call the function by name
reflect.ValueOf(AVAILABLE_FUNCTIONS[C.GoString(input)]).Call([]reflect.Value{})
}
// switch C.GoString(input) {
// case "version":
// logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// temp = EXTENSION_VERSION
// case "getDir":
// logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// temp = getDir()
// case "loadSettings":
// logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// cwd, result, influxHost, timescaleUrl := loadSettings()
// log.Println("CWD:", cwd)
// log.Println("RESULT:", result)
// log.Println("INFLUX HOST:", influxHost)
// log.Println("TIMESCALE URL:", timescaleUrl)
// if result != "" {
// logLine("goRVExtension", result)
// temp = fmt.Sprintf(
// `["%s", "%s", "%s", "%d"]`,
// EXTENSION_VERSION,
// influxConnectionSettings.Host,
// influxConnectionSettings.Org,
// a3Settings.RefreshRateMs,
// )
// }
// case "connectToInflux":
// // logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// go connectToInflux()
// temp = `["Connecting to InfluxDB", "INFO"]`
// case "connectToTimescale":
// // logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// go connectToTimescale()
// temp = `["Connecting to TimescaleDB", "INFO"]`
// case "getUnixTimeNano":
// temp = fmt.Sprintf(`["%d", "INFO"]`, getUnixTimeNano())
// case "deinitialize":
// logLine("goRVExtension", fmt.Sprintf(`["Input: %s", "INFO"]`, C.GoString(input)))
// deinitialize()
// temp = `["Deinitializing", "INFO"]`
// default:
// temp = fmt.Sprintf(`["Unknown command: %s", "ERROR"]`, C.GoString(input))
// }
result := C.CString(temp)
defer C.free(unsafe.Pointer(result))
var size = C.strlen(result) + 1