some minor adjustments to log write errors better & finalize str sanitiz

This commit is contained in:
2023-04-12 22:45:26 -07:00
parent ac5869e63b
commit fc16dbb42d

62
arma.go
View File

@@ -34,6 +34,9 @@ var ADDON_FOLDER string = "./@RangerMetrics"
var LOG_FILE string = ADDON_FOLDER + "/rangermetrics.log" var LOG_FILE string = ADDON_FOLDER + "/rangermetrics.log"
var SETTINGS_FILE string = ADDON_FOLDER + "/settings.json" var SETTINGS_FILE string = ADDON_FOLDER + "/settings.json"
// var BACKUP_FILE_PATH string = ADDON_FOLDER + "/local_backup.log.gzip"
// var BACKUP_WRITER *gzip.Writer
// configure log output // configure log output
func init() { func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile) log.SetFlags(log.LstdFlags | log.Lshortfile)
@@ -68,7 +71,19 @@ type settingsJson struct {
func connectToInflux() string { func connectToInflux() string {
if influxConnectionSettings.Host == "" { if influxConnectionSettings.Host == "" {
logLine("connectToInflux", `["influxConnectionSettings.Host is empty", "ERROR"]`) logLine("connectToInflux", `["influxConnectionSettings.Host is empty", "ERROR"]`)
return "Error connecting to Influx" // logLine("connectToInflux", `["Creating backup file", "INFO"]`)
// file, err := os.Open(BACKUP_FILE_PATH)
// if err != nil {
// log.Fatal(err)
// logLine("connectToInflux", `["Error opening backup file", "ERROR"]`)
// }
// BACKUP_WRITER = gzip.NewWriter(file)
// if err != nil {
// log.Fatal(err)
// logLine("connectToInflux", `["Error creating gzip writer", "ERROR"]`)
// }
// return "Error connecting to Influx. Using local backup"
return "Error connecting to Influx."
} }
DB_CLIENT = influxdb2.NewClientWithOptions(influxConnectionSettings.Host, influxConnectionSettings.Token, influxdb2.DefaultOptions().SetBatchSize(500).SetFlushInterval(2000)) DB_CLIENT = influxdb2.NewClientWithOptions(influxConnectionSettings.Host, influxConnectionSettings.Token, influxdb2.DefaultOptions().SetBatchSize(500).SetFlushInterval(2000))
@@ -195,7 +210,16 @@ func goRVExtensionArgs(output *C.char, outputsize C.size_t, input *C.char, argv
out = append(out, C.GoString(*argv)) out = append(out, C.GoString(*argv))
argv = (**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(argv)) + offset)) argv = (**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(argv)) + offset))
} }
temp := fmt.Sprintf("Function: %s nb params: %d params: %s!", C.GoString(input), argc, out)
var temp string
temp = fmt.Sprintf("Function: %s nb params: %d params: %s!", C.GoString(input), argc, out)
if C.GoString(input) == "sendToInflux" {
// start a goroutine to send the data to influx
// param string is argv[0] which is the data to send to influx
go sendToInflux(&out)
temp = fmt.Sprintf("Function: %s nb params: %d", C.GoString(input), argc)
}
// Return a result to Arma // Return a result to Arma
result := C.CString(temp) result := C.CString(temp)
@@ -205,13 +229,6 @@ func goRVExtensionArgs(output *C.char, outputsize C.size_t, input *C.char, argv
size = outputsize size = outputsize
} }
if C.GoString(input) == "sendToInflux" {
// start a goroutine to send the data to influx
// param string is argv[0] which is the data to send to influx
// go sendToInflux(out)
go sendToInflux(&out)
}
C.memmove(unsafe.Pointer(output), unsafe.Pointer(result), size) C.memmove(unsafe.Pointer(output), unsafe.Pointer(result), size)
} }
@@ -249,8 +266,6 @@ func sendToInflux(a3DataRaw *[]string) string {
// convert to string array // convert to string array
a3Data := *a3DataRaw a3Data := *a3DataRaw
var err error
logLine("sendToInflux", fmt.Sprintf(`["Received %d params", "DEBUG"]`, len(a3Data))) logLine("sendToInflux", fmt.Sprintf(`["Received %d params", "DEBUG"]`, len(a3Data)))
MIN_PARAMS_COUNT := 1 MIN_PARAMS_COUNT := 1
@@ -265,8 +280,9 @@ func sendToInflux(a3DataRaw *[]string) string {
} }
// use custom bucket or default // use custom bucket or default
var bucket string = trimQuotes(string(a3Data[0])) var bucket string = fixEscapeQuotes(trimQuotes(string(a3Data[0])))
// Get non-blocking write client
WRITE_API := DB_CLIENT.WriteAPI(influxConnectionSettings.Org, bucket) WRITE_API := DB_CLIENT.WriteAPI(influxConnectionSettings.Org, bucket)
if WRITE_API == nil { if WRITE_API == nil {
@@ -275,17 +291,29 @@ func sendToInflux(a3DataRaw *[]string) string {
return logData return logData
} }
// Get errors channel
errorsCh := WRITE_API.Errors()
go func() {
for writeErr := range errorsCh {
logData = fmt.Sprintf(`["Error parsing line protocol: %s", "ERROR"]`, strings.Replace(writeErr.Error(), `"`, `'`, -1))
logLine(functionName, logData)
}
}()
// now we have our write client, we'll go through the rest of the receive array items in line protocol format and write them to influx // now we have our write client, we'll go through the rest of the receive array items in line protocol format and write them to influx
for i := 1; i < len(a3Data); i++ { for i := 1; i < len(a3Data); i++ {
var p string = fixEscapeQuotes(trimQuotes(string(a3Data[i]))) var p string = fixEscapeQuotes(trimQuotes(string(a3Data[i])))
// write the line to influx
WRITE_API.WriteRecord(p) WRITE_API.WriteRecord(p)
if err != nil { // TODO: Add backup writer
logData = fmt.Sprintf(`["Error parsing line protocol: %s", "ERROR"]`, err.Error()) // // append backup line to file if BACKUP_WRITER is set
logLine(functionName, logData) // //
return logData // if BACKUP_WRITER != nil {
} // _, err = BACKUP_WRITER.Write([]byte(p + "\n"))
// }
} }
// schedule cleanup // schedule cleanup