diff --git a/.gitignore b/.gitignore index 7428187..42112e1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ extension/RangerMetrics.h extension/RangerMetrics_x64.h + +\@RangerMetrics/settings.json diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf new file mode 100644 index 0000000..c8f6ca9 --- /dev/null +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf @@ -0,0 +1,13 @@ + +private _threadsToCheck = RangerMetrics_activeThreads; +{ + private _finished = ["RangerMetrics.influx.has_call_finished", [_threadId]] call py3_fnc_callExtension; + if (_finished) then { + _threadsToCheck = _threadsToCheck - [_threadId]; + if (missionNamespace getVariable ["RangerMetrics_debug",false]) then { + private _return = ["RangerMetrics.influx.get_call_value", [_threadId]] call py3_fnc_callExtension; + [format ["Thread result: %1", _extSend], "DEBUG"] call RangerMetrics_fnc_log; + }; + }; +} forEach _threadsToCheck; + diff --git a/@RangerMetrics/python/$PYTHIA$ b/@RangerMetrics/python/$PYTHIA$ new file mode 100644 index 0000000..79fca4e --- /dev/null +++ b/@RangerMetrics/python/$PYTHIA$ @@ -0,0 +1 @@ +RangerMetrics \ No newline at end of file diff --git a/@RangerMetrics/python/__init__.py b/@RangerMetrics/python/__init__.py new file mode 100644 index 0000000..3e7bf1b --- /dev/null +++ b/@RangerMetrics/python/__init__.py @@ -0,0 +1 @@ +from . import influx diff --git a/@RangerMetrics/python/influx.py b/@RangerMetrics/python/influx.py new file mode 100644 index 0000000..066c197 --- /dev/null +++ b/@RangerMetrics/python/influx.py @@ -0,0 +1,134 @@ +import influxdb_client +from influxdb_client.client.write_api import SYNCHRONOUS +import threading +from pyproj import Transformer +from datetime import datetime +import json +import os +from .threading_utils import call_slow_function, has_call_finished, get_call_value + +settings = None + + +host = settings["influxdb"]["host"] +token = settings["influxdb"]["token"] +org = settings["influxdb"]["org"] +bucket = settings["influxdb"]["bucket"] +refreshRateMs = settings["arma3"]["refreshRateMs"] + +transformer = Transformer.from_crs("epsg:3857", "epsg:4326") + +DBCLIENT = influxdb_client.InfluxDBClient( + url=host, token=token, org=org, enable_gzip=True +) +WRITE_API = DBCLIENT.write_api(write_options=SYNCHRONOUS) + + +def get_dir(): + # get current dir + return [ + os.path.dirname(os.path.realpath(__file__)) + "\\" + os.path.basename(__file__) + ] + + +def load_settings(): + # import settings from settings.json + global settings + with open("settings.json", "r") as f: + settings = json.load(f) + # get path to arma3 directory + global arma3_path + + +def test_data(data): + with open("influxdb_data.log", "a") as f: + f.write(str(data) + "\n") + f.write(f"{datetime.now()}: {data[2]}\n") + # convert to dict from list of key, value pairs + # format [[key, value], [key, value]] to {key: value, key: value} + measurement, tag_set, field_set, position = data + tag_dict = dict(tag_set) + field_dict = dict(field_set) + f.write( + f"{datetime.now()}: {measurement}, {json.dumps(tag_dict, indent=2)}, {json.dumps(field_dict, indent=2)}, {position}\n" + ) + + # thread the write to influxdb + return [data, dict(data[1])] + + +def log_to_file(data): + # threaded, write backup to file + with open("influxdb_data.log", "a") as f: + f.write(f"{data}\n") + return True + + +def write_data(data): + # thread the write to influxdb + # t = threading.Thread(target=write_points_async, args=(data,), daemon=True) + # t.start() + thread_id = call_slow_function(write_points_async, data) + return [thread_id] + + +def write_points_async(data): + processed = [] + timestamp = f" {int(datetime.now().timestamp() * 1e9)}" + + process_log = open("influxdb_process.log", "a") + + # process_log.write(f"{datetime.now()}: Processing {len(data)} data points\n") + # process_log.write(f"{datetime.now()}: {data[0]}\n") + + for point in data: + measurement = point[0] + tag_set = point[1] + field_set = point[2] + if len(point) > 3: + position = point[3] + + tag_dict = dict(tag_set) + field_dict = dict(field_set) + + point_dict = { + "measurement": measurement, + "tags": tag_dict, + "fields": field_dict, + } + + if position is not None: + + # convert position to lat/lon + lat, lon = transformer.transform( + position[0], + position[1], + ) + point_dict["fields"]["lat"] = lat + point_dict["fields"]["lon"] = lon + point_dict["fields"]["alt"] = position[2] + + processed.append(point_dict) + + # process_log.write(f"{datetime.now()}: Writing {len(processed)} data points\n") + # process_log.write(f"{datetime.now()}: {json.dumps(processed, indent=2)}\n") + + try: + result = WRITE_API.write(bucket, org, processed) + process_log.write(f"{datetime.now()}: Success\n") + except Exception as e: + # write to file + with open("influxdb_error.log", "a") as f: + f.write(f"{datetime.now()}: {e}\n") + + # free up memory + del processed + del transformer + del timestamp + del process_log + + return ("OK", 200) + + +has_call_finished # noqa imported functions +get_call_value # noqa imported functions diff --git a/@RangerMetrics/python/requirements.txt b/@RangerMetrics/python/requirements.txt new file mode 100644 index 0000000..373dcde --- /dev/null +++ b/@RangerMetrics/python/requirements.txt @@ -0,0 +1 @@ +influxdb-client \ No newline at end of file diff --git a/@RangerMetrics/python/threading_utils.py b/@RangerMetrics/python/threading_utils.py new file mode 100644 index 0000000..bb738dc --- /dev/null +++ b/@RangerMetrics/python/threading_utils.py @@ -0,0 +1,52 @@ +import threading + + +# https://stackoverflow.com/a/65447493/6543759 +class ThreadWithResult(threading.Thread): + def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): + if not kwargs: + kwargs = {} + + def function(): + self.result = target(*args, **kwargs) + super().__init__(group=group, target=function, name=name, daemon=daemon) + + +THREADS = {} +THREAD_ID = 0 + + +def call_slow_function(function, args): + global THREADS, THREAD_ID + thread = ThreadWithResult(target=function, args=args, daemon=True) + THREAD_ID += 1 + THREADS[THREAD_ID] = thread + thread.start() + + return THREAD_ID + + +def has_call_finished(thread_id): + global THREADS + + thread = THREADS[thread_id] + if thread.is_alive(): + # Thread is still working + return False + + # Thread has finished, we can return its value using get_call_value() + return True + + +def get_call_value(thread_id): + global THREADS + + thread = THREADS[thread_id] + if thread.is_alive(): + # Thread is still working + raise ValueError('Thread is still running!') + + # Thread has finished, we can return its value now + thread.join() + del THREADS[thread_id] + return thread.result \ No newline at end of file diff --git a/@RangerMetrics/settings.json.example b/@RangerMetrics/settings.json.example new file mode 100644 index 0000000..4a3c403 --- /dev/null +++ b/@RangerMetrics/settings.json.example @@ -0,0 +1,6 @@ +{ + "host" : "http://INFLUX_URL:8086", + "token": "XXXXXXXXXXXXXXXXXXXXXXXXXXXX_AUTH_TOKEN_XXXXXXXXXXXXXXXXXXXXXXXXXXX", + "org" : "ORG_NAME", + "bucket" : "BUCKET_NAME", +} \ No newline at end of file