diff --git a/arma.go b/arma.go index a8d5a0b..01bf4c8 100644 --- a/arma.go +++ b/arma.go @@ -34,6 +34,9 @@ var ADDON_FOLDER string = "./@RangerMetrics" var LOG_FILE string = ADDON_FOLDER + "/rangermetrics.log" var SETTINGS_FILE string = ADDON_FOLDER + "/settings.json" +// var BACKUP_FILE_PATH string = ADDON_FOLDER + "/local_backup.log.gzip" +// var BACKUP_WRITER *gzip.Writer + // configure log output func init() { log.SetFlags(log.LstdFlags | log.Lshortfile) @@ -68,7 +71,19 @@ type settingsJson struct { func connectToInflux() string { if influxConnectionSettings.Host == "" { logLine("connectToInflux", `["influxConnectionSettings.Host is empty", "ERROR"]`) - return "Error connecting to Influx" + // logLine("connectToInflux", `["Creating backup file", "INFO"]`) + // file, err := os.Open(BACKUP_FILE_PATH) + // if err != nil { + // log.Fatal(err) + // logLine("connectToInflux", `["Error opening backup file", "ERROR"]`) + // } + // BACKUP_WRITER = gzip.NewWriter(file) + // if err != nil { + // log.Fatal(err) + // logLine("connectToInflux", `["Error creating gzip writer", "ERROR"]`) + // } + // return "Error connecting to Influx. Using local backup" + return "Error connecting to Influx." } DB_CLIENT = influxdb2.NewClientWithOptions(influxConnectionSettings.Host, influxConnectionSettings.Token, influxdb2.DefaultOptions().SetBatchSize(500).SetFlushInterval(2000)) @@ -195,7 +210,16 @@ func goRVExtensionArgs(output *C.char, outputsize C.size_t, input *C.char, argv out = append(out, C.GoString(*argv)) argv = (**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(argv)) + offset)) } - temp := fmt.Sprintf("Function: %s nb params: %d params: %s!", C.GoString(input), argc, out) + + var temp string + temp = fmt.Sprintf("Function: %s nb params: %d params: %s!", C.GoString(input), argc, out) + + if C.GoString(input) == "sendToInflux" { + // start a goroutine to send the data to influx + // param string is argv[0] which is the data to send to influx + go sendToInflux(&out) + temp = fmt.Sprintf("Function: %s nb params: %d", C.GoString(input), argc) + } // Return a result to Arma result := C.CString(temp) @@ -205,13 +229,6 @@ func goRVExtensionArgs(output *C.char, outputsize C.size_t, input *C.char, argv size = outputsize } - if C.GoString(input) == "sendToInflux" { - // start a goroutine to send the data to influx - // param string is argv[0] which is the data to send to influx - // go sendToInflux(out) - go sendToInflux(&out) - } - C.memmove(unsafe.Pointer(output), unsafe.Pointer(result), size) } @@ -249,8 +266,6 @@ func sendToInflux(a3DataRaw *[]string) string { // convert to string array a3Data := *a3DataRaw - var err error - logLine("sendToInflux", fmt.Sprintf(`["Received %d params", "DEBUG"]`, len(a3Data))) MIN_PARAMS_COUNT := 1 @@ -265,8 +280,9 @@ func sendToInflux(a3DataRaw *[]string) string { } // use custom bucket or default - var bucket string = trimQuotes(string(a3Data[0])) + var bucket string = fixEscapeQuotes(trimQuotes(string(a3Data[0]))) + // Get non-blocking write client WRITE_API := DB_CLIENT.WriteAPI(influxConnectionSettings.Org, bucket) if WRITE_API == nil { @@ -275,17 +291,29 @@ func sendToInflux(a3DataRaw *[]string) string { return logData } + // Get errors channel + errorsCh := WRITE_API.Errors() + go func() { + for writeErr := range errorsCh { + logData = fmt.Sprintf(`["Error parsing line protocol: %s", "ERROR"]`, strings.Replace(writeErr.Error(), `"`, `'`, -1)) + logLine(functionName, logData) + } + }() + // now we have our write client, we'll go through the rest of the receive array items in line protocol format and write them to influx for i := 1; i < len(a3Data); i++ { var p string = fixEscapeQuotes(trimQuotes(string(a3Data[i]))) + // write the line to influx WRITE_API.WriteRecord(p) - if err != nil { - logData = fmt.Sprintf(`["Error parsing line protocol: %s", "ERROR"]`, err.Error()) - logLine(functionName, logData) - return logData - } + // TODO: Add backup writer + // // append backup line to file if BACKUP_WRITER is set + // // + // if BACKUP_WRITER != nil { + // _, err = BACKUP_WRITER.Write([]byte(p + "\n")) + // } + } // schedule cleanup