From 97bda2c8a83a9a7b22e1f2aee84c971ea6210631 Mon Sep 17 00:00:00 2001 From: IndigoFox Date: Fri, 7 Apr 2023 01:40:42 -0700 Subject: [PATCH] working python --- .../addons/RangerMetrics/config.cpp | 8 +- .../functions/fn_checkResults.sqf | 21 +- .../RangerMetrics/functions/fn_encodeJSON.sqf | 107 ++++++++++ .../RangerMetrics/functions/fn_gather.sqf | 83 ++++++++ .../addons/RangerMetrics/functions/fn_log.sqf | 6 +- .../RangerMetrics/functions/fn_postInit.sqf | 95 +++++---- .../RangerMetrics/functions/fn_queue.sqf | 42 ++++ .../addons/RangerMetrics/functions/fn_run.sqf | 80 -------- .../RangerMetrics/functions/fn_send.sqf | 95 +++++---- .../helpers/fn_getMissionConfig.sqf | 164 +++++++++++++++ @RangerMetrics/python/__init__.py | 2 + @RangerMetrics/python/influx.py | 186 ++++++++++++------ @RangerMetrics/python/threading_utils.py | 40 +++- 13 files changed, 684 insertions(+), 245 deletions(-) create mode 100644 @RangerMetrics/addons/RangerMetrics/functions/fn_encodeJSON.sqf create mode 100644 @RangerMetrics/addons/RangerMetrics/functions/fn_gather.sqf create mode 100644 @RangerMetrics/addons/RangerMetrics/functions/fn_queue.sqf delete mode 100644 @RangerMetrics/addons/RangerMetrics/functions/fn_run.sqf create mode 100644 @RangerMetrics/addons/RangerMetrics/helpers/fn_getMissionConfig.sqf diff --git a/@RangerMetrics/addons/RangerMetrics/config.cpp b/@RangerMetrics/addons/RangerMetrics/config.cpp index 63cc5a9..03e16ee 100644 --- a/@RangerMetrics/addons/RangerMetrics/config.cpp +++ b/@RangerMetrics/addons/RangerMetrics/config.cpp @@ -4,7 +4,7 @@ class CfgPatches { weapons[] = {}; requiredVersion = 0.1; requiredAddons[] = {}; - author[] = {"EagleTrooper and Gary"}; + author[] = {"EagleTrooper","Gary","IndigoFox"}; authorUrl = "http://example.com"; }; }; @@ -14,9 +14,11 @@ class CfgFunctions { class Common { file = "\RangerMetrics\functions"; class postInit { postInit = 1;}; - class log {}; + class gather {}; + class queue {}; class send {}; - class run {}; + class checkResults {}; + class log {}; }; }; }; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf index c8f6ca9..ed0fea5 100644 --- a/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_checkResults.sqf @@ -1,13 +1,22 @@ - -private _threadsToCheck = RangerMetrics_activeThreads; { + private _threadId = _x; private _finished = ["RangerMetrics.influx.has_call_finished", [_threadId]] call py3_fnc_callExtension; - if (_finished) then { - _threadsToCheck = _threadsToCheck - [_threadId]; + // systemChat str _finished; + if (isNil "_finished") exitWith { + RangerMetrics_activeThreads = RangerMetrics_activeThreads - [_threadId]; + [format ["[%1]: Thread %2 not found", RangerMetrics_logPrefix, _threadId], "WARN"] call RangerMetrics_fnc_log; + }; + if (_finished isEqualTo []) exitWith { + RangerMetrics_activeThreads = RangerMetrics_activeThreads - [_threadId]; + [format ["[%1]: Thread %2 not found", RangerMetrics_logPrefix, _threadId], "WARN"] call RangerMetrics_fnc_log; + }; + + if (_finished isEqualTo true) then { + RangerMetrics_activeThreads = RangerMetrics_activeThreads - [_threadId]; if (missionNamespace getVariable ["RangerMetrics_debug",false]) then { private _return = ["RangerMetrics.influx.get_call_value", [_threadId]] call py3_fnc_callExtension; - [format ["Thread result: %1", _extSend], "DEBUG"] call RangerMetrics_fnc_log; + [format ["%1", _return], "DEBUG"] call RangerMetrics_fnc_log; }; }; -} forEach _threadsToCheck; +} forEach RangerMetrics_activeThreads; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_encodeJSON.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_encodeJSON.sqf new file mode 100644 index 0000000..8847ecc --- /dev/null +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_encodeJSON.sqf @@ -0,0 +1,107 @@ +/* ---------------------------------------------------------------------------- +Function: CBA_fnc_encodeJSON + +Description: + Serializes input to a JSON string. Can handle + - ARRAY + - BOOL + - CONTROL + - GROUP + - LOCATION + - NAMESPACE + - NIL (ANY) + - NUMBER + - OBJECT + - STRING + - TASK + - TEAM_MEMBER + - HASHMAP + - Everything else will simply be stringified. + +Parameters: + _object - Object to serialize. + +Returns: + _json - JSON string containing serialized object. + +Examples: + (begin example) + private _settings = call CBA_fnc_createNamespace; + _settings setVariable ["enabled", true]; + private _json = [_settings] call CBA_fnc_encodeJSON; + (end) + +Author: + BaerMitUmlaut +---------------------------------------------------------------------------- */ +params ["_object"]; + +if (isNil "_object") exitWith { "null" }; + +switch (typeName _object) do { + case "SCALAR"; + case "BOOL": { + str _object; + }; + + case "STRING": { + { + _object = [_object, _x#0, _x#1] call CBA_fnc_replace; + } forEach [ + ["\", "\\"], + ["""", "\"""], + [toString [8], "\b"], + [toString [12], "\f"], + [endl, "\n"], + [toString [10], "\n"], + [toString [13], "\r"], + [toString [9], "\t"] + ]; + // Stringify without escaping inter string quote marks. + """" + _object + """" + }; + + case "ARRAY": { + if ([_object] call CBA_fnc_isHash) then { + private _json = (([_object] call CBA_fnc_hashKeys) apply { + private _name = _x; + private _value = [_object, _name] call CBA_fnc_hashGet; + + format ["%1: %2", [_name] call CBA_fnc_encodeJSON, [_value] call CBA_fnc_encodeJSON] + }) joinString ", "; + "{" + _json + "}" + } else { + private _json = (_object apply {[_x] call CBA_fnc_encodeJSON}) joinString ", "; + "[" + _json + "]" + }; + }; + + case "HASHMAP": { + private _json = ((_object toArray false) apply { + _x params ["_key", ["_value", objNull]]; + + if !(_key isEqualType "") then { + _key = str _key; + }; + + format ["%1: %2", [_key] call CBA_fnc_encodeJSON, [_value] call CBA_fnc_encodeJSON] + }) joinString ", "; + "{" + _json + "}" + }; + + default { + if !(typeName _object in (supportInfo "u:allVariables*" apply {_x splitString " " select 1})) exitWith { + [str _object] call CBA_fnc_encodeJSON + }; + + if (isNull _object) exitWith { "null" }; + + private _json = ((allVariables _object) apply { + private _name = _x; + private _value = _object getVariable [_name, objNull]; + + format ["%1: %2", [_name] call CBA_fnc_encodeJSON, [_value] call CBA_fnc_encodeJSON] + }) joinString ", "; + "{" + _json + "}" + }; +}; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_gather.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_gather.sqf new file mode 100644 index 0000000..9a34c81 --- /dev/null +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_gather.sqf @@ -0,0 +1,83 @@ + +// function adapted from YAINA by MartinCo at http://yaina.eu + +params [["_cba",false,[true]]]; + +if(missionNamespace getVariable ["RangerMetrics_run",false]) then { + private _startTime = diag_tickTime; + + // Mission name + ["server", "mission_name", [["source", "onLoadName"]], nil, "string", getMissionConfigValue ["onLoadName", ""]] call RangerMetrics_fnc_queue; + ["server", "mission_name", [["source", "missionName"]], nil, "string", missionName] call RangerMetrics_fnc_queue; + ["server", "mission_name", [["source", "missionNameSource"]], nil, "string", missionNameSource] call RangerMetrics_fnc_queue; + ["server", "mission_name", [["source", "briefingName"]], nil, "string", briefingName] call RangerMetrics_fnc_queue; + + ["server", "server_uptime", nil, nil, "float", diag_tickTime toFixed 2] call RangerMetrics_fnc_queue; + + // Number of local units + ["simulation", "entity_count", [["entity_type", "unit"], ["only_local", true]], nil, "int", { local _x } count allUnits] call RangerMetrics_fnc_queue; + ["simulation", "entity_count", [["entity_type", "group"], ["only_local", true]], nil, "int", { local _x } count allGroups] call RangerMetrics_fnc_queue; + ["simulation", "entity_count", [["entity_type", "vehicles"], ["only_local", true]], nil, "int", { local _x} count vehicles] call RangerMetrics_fnc_queue; + + // Server Stats + ["simulation", "fps", [["metric", "avg"]], nil, "float", diag_fps toFixed 2] call RangerMetrics_fnc_queue; + ["simulation", "fps", [["metric", "avg_min"]], nil, "float", diag_fpsMin toFixed 2] call RangerMetrics_fnc_queue; + ["simulation", "mission_time", nil, nil, "float", time toFixed 2] call RangerMetrics_fnc_queue; + + // Scripts + private _activeScripts = diag_activeScripts; + ["simulation", "script_count", [["execution", "spawn"]], nil, "int", _activeScripts select 0] call RangerMetrics_fnc_queue; + ["simulation", "script_count", [["execution", "execVM"]], nil, "int", _activeScripts select 1] call RangerMetrics_fnc_queue; + ["simulation", "script_count", [["execution", "exec"]], nil, "int", _activeScripts select 2] call RangerMetrics_fnc_queue; + ["simulation", "script_count", [["execution", "execFSM"]], nil, "int", _activeScripts select 3] call RangerMetrics_fnc_queue; + + private _pfhCount = if(_cba) then {count CBA_common_perFrameHandlerArray} else {0}; + ["simulation", "script_count", [["execution", "pfh"]], nil, "int", _pfhCount] call RangerMetrics_fnc_queue; + + // Globals if server + if (isServer) then { + // Number of global units + ["simulation", "entity_count", [["entity_type", "unit"], ["only_local", false]], nil, "int", count allUnits] call RangerMetrics_fnc_queue; + ["simulation", "entity_count", [["entity_type", "group"], ["only_local", false]], nil, "int", count allGroups] call RangerMetrics_fnc_queue; + ["simulation", "entity_count", [["entity_type", "vehicle"], ["only_local", false]], nil, "int", count vehicles] call RangerMetrics_fnc_queue; + ["simulation", "entity_count", [["entity_type", "player"], ["only_local", false]], nil, "int", count allPlayers] call RangerMetrics_fnc_queue; + }; + + + private _headlessClients = entities "HeadlessClient_F"; + { + { + private _stats_fps = diag_fps toFixed 2; + private _stats_fps_min = diag_fpsMin toFixed 2; + ["simulation", "fps_hc", [["metric", "avg"]], nil, "float", _stats_fps] remoteExec ["RangerMetrics_fnc_queue", 2]; + ["simulation", "fps_hc", [["metric", "avg_min"]], nil, "float", _stats_fps_min] remoteExec ["RangerMetrics_fnc_queue", 2]; + + } remoteExecCall ["bis_fnc_call", owner _x]; + } foreach _headlessClients; + + + + +/** WORKING HEADLESS CODE COMMENTED OUT TO TRY SOMETHING DIFFERNT + + // Headless Clients FPS + // Thanks to CPL.Brostrom.A + private _headlessClients = entities "HeadlessClient_F"; + { + { + private _stats_fps = round diag_fps; + ["stats.HCfps", _stats_fps] remoteExec ["RangerMetrics_fnc_queue", 2]; + } remoteExecCall ["bis_fnc_call", owner _x]; + } foreach _headlessClients; + + +*/ + + + + // log the runtime and switch off debug so it doesn't flood the log + if(missionNamespace getVariable ["RangerMetrics_debug",false]) then { + [format ["Run time: %1", diag_tickTime - _startTime], "DEBUG"] call RangerMetrics_fnc_log; + // missionNamespace setVariable ["RangerMetrics_debug",false]; + }; +}; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_log.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_log.sqf index 4c5f030..5cce686 100644 --- a/@RangerMetrics/addons/RangerMetrics/functions/fn_log.sqf +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_log.sqf @@ -1,5 +1,9 @@ params [["_text","Log text invalid",[""]], ["_type","INFO",[""]]]; -private _textFormatted = format ["[RangerMetrics] %1: %2", _type, _text]; +private _textFormatted = format [ + "[%1] %2: %3", + RangerMetrics_logPrefix, + _type, + _text]; if(isServer) then { diag_log text _textFormatted; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_postInit.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_postInit.sqf index aef74b6..e3e06b5 100644 --- a/@RangerMetrics/addons/RangerMetrics/functions/fn_postInit.sqf +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_postInit.sqf @@ -1,47 +1,74 @@ - -// function adapted from YAINA by MartinCo at http://yaina.eu - // if (!isServer) exitWith {}; _cba = (isClass(configFile >> "CfgPatches" >> "cba_main")); +RangerMetrics_logPrefix = "RangerMetrics"; RangerMetrics_debug = true; +RangerMetrics_activeThreads = []; +RangerMetrics_messageQueue = createHashMap; [format ["Instance name: %1", profileName]] call RangerMetrics_fnc_log; [format ["CBA detected: %1", _cba]] call RangerMetrics_fnc_log; ["Initializing v1.1"] call RangerMetrics_fnc_log; -// _extData = "RangerMetrics" callExtension "loadSettings"; -// if (_extData == "0") exitWith { -// ["Extension not found, disabling"] call RangerMetrics_fnc_log; -// RangerMetrics_run = false; -// }; - -// _extData = parseSimpleArray _extData; -// RangerMetrics_settingsDir = _extData select 0; -// RangerMetrics_settingsLoaded = _extData select 1; -// RangerMetrics_influxURL = _extData select 2; - -// [format["InfluxDB URL: %1", RangerMetrics_influxURL]] call RangerMetrics_fnc_log; -// _extVersion = "RangerMetrics" callExtension "version"; -// ["Extension version: " + _extVersion] call RangerMetrics_fnc_log; - -addMissionEventHandler ["ExtensionCallback", { - params ["_name", "_function", "_data"]; - if (_name == "RangerMetrics") then { - [parseSimpleArray _data] call RangerMetrics_fnc_log; +private _settingsLoaded = ["RangerMetrics.influx.load_settings", []] call py3_fnc_callExtension; +if (isNil "_settingsLoaded") exitWith { + ["Extension not found, disabling"] call RangerMetrics_fnc_log; + RangerMetrics_run = false; +}; +if (_settingsLoaded isEqualTo []) then { + if (count _settingsLoaded == 0) exitWith { + ["Settings not loaded, disabling"] call RangerMetrics_fnc_log; + RangerMetrics_run = false; }; -}]; + if (_settingsLoaded#0 isEqualTo 1) exitWith { + [ + format["Settings not loaded, disabling. %1", _settingsLoaded#1], + "ERROR" + ] call RangerMetrics_fnc_log; + RangerMetrics_run = false; + }; +}; +format["Settings loaded: %1", _settingsLoaded#2] call RangerMetrics_fnc_log; +RangerMetrics_settings = _settingsLoaded#2; +// RangerMetrics_settings = createHashMap; +// private _top = createHashMapFromArray _settingsLoaded#2; +// RangerMetrics_settings set [ +// "influxDB", +// createHashMapFromArray (_top get "influxDB") +// ]; +// RangerMetrics_settings set [ +// "arma3", +// createHashMapFromArray (_top get "refreshRateMs") +// ]; +["RangerMetrics.influx.connect_to_influx", []] call py3_fnc_callExtension; -// RangerMetrics_run = true; +RangerMetrics_run = true; -// if(_cba) then { // CBA is running, use PFH -// [RangerMetrics_fnc_run, 10, [_cba]] call CBA_fnc_addPerFrameHandler; -// } else { // CBA isn't running, use sleep -// [_cba] spawn { -// params ["_cba"]; -// while{true} do { -// [[_cba]] call RangerMetrics_fnc_run; // nested to match CBA PFH signature -// sleep 10; -// }; +// addMissionEventHandler ["ExtensionCallback", { +// params ["_name", "_function", "_data"]; +// if (_name == "RangerMetrics") then { +// [parseSimpleArray _data] call RangerMetrics_fnc_log; // }; -// }; +// }]; + +if(_cba) then { // CBA is running, use PFH + [{ + params ["_args", "_idPFH"]; + _args params [["_cba", false]]; + [_cba] call RangerMetrics_fnc_gather; + call RangerMetrics_fnc_checkResults; + call RangerMetrics_fnc_send; + // }, (RangerMetrics_settings get "arma3" get "refreshRateMs"), [_cba]] call CBA_fnc_addPerFrameHandler; + }, 1, [_cba]] call CBA_fnc_addPerFrameHandler; +} else { // CBA isn't running, use sleep + [_cba] spawn { + params ["_cba"]; + while {true} do { + [_cba] call RangerMetrics_fnc_gather; // nested to match CBA PFH signature + call RangerMetrics_fnc_checkResults; + call RangerMetrics_fnc_send; + // sleep (RangerMetrics_settings get "arma3" get "refreshRateMs"); + sleep 1; + }; + }; +}; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_queue.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_queue.sqf new file mode 100644 index 0000000..bc9f527 --- /dev/null +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_queue.sqf @@ -0,0 +1,42 @@ +params [ + ["_bucket", "default", [""]], + "_measurement", + ["_tags", nil, [[], nil]], + ["_fields", nil, [[], nil]], + "_valueType", + "_value" +]; + +private _profileName = profileName; +private _prefix = "Arma3"; + +private _extSend = [ + _measurement, // metric name + _valueType, // float or int + [ // tags + ["profile", _profileName], + ["world", toLower worldName] + ], + [ // fields + ["server", serverName], + ["mission", missionName], + ["value", _value] + ] +]; + +if (!isNil "_tags") then { + { + (_extSend select 2) pushBack [_x#0, _x#1]; + } forEach _tags; +}; + +if (!isNil "_fields") then { + { + (_extSend select 3) pushBack [_x#0, _x#1]; + } forEach _fields; +}; + +// add to queue +(RangerMetrics_messageQueue getOrDefault [_bucket, [], true]) pushBack _extSend; + +true diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_run.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_run.sqf deleted file mode 100644 index 26e736a..0000000 --- a/@RangerMetrics/addons/RangerMetrics/functions/fn_run.sqf +++ /dev/null @@ -1,80 +0,0 @@ - -// function adapted from YAINA by MartinCo at http://yaina.eu - -params ["_args"]; -_args params [["_cba",false,[true]]]; - -if(missionNamespace getVariable ["RangerMetrics_run",false]) then { - private _startTime = diag_tickTime; - - // Number of local units - ["count.units", "int", { local _x } count allUnits] call RangerMetrics_fnc_send; - ["count.groups", "int", { local _x } count allGroups] call RangerMetrics_fnc_send; - ["count.vehicles", "int", { local _x} count vehicles] call RangerMetrics_fnc_send; - - // Server Stats - ["stats.fps", "float", diag_fps toFixed 2] call RangerMetrics_fnc_send; - ["stats.fpsMin", "float", diag_fpsMin toFixed 2] call RangerMetrics_fnc_send; - ["stats.uptime", "float", diag_tickTime toFixed 2] call RangerMetrics_fnc_send; - ["stats.missionTime", "float", time toFixed 2] call RangerMetrics_fnc_send; - - // Scripts - private _activeScripts = diag_activeScripts; - ["scripts.spawn", "int", _activeScripts select 0] call RangerMetrics_fnc_send; - ["scripts.execVM", "int", _activeScripts select 1] call RangerMetrics_fnc_send; - ["scripts.exec", "int", _activeScripts select 2] call RangerMetrics_fnc_send; - ["scripts.execFSM", "int", _activeScripts select 3] call RangerMetrics_fnc_send; - - private _pfhCount = if(_cba) then {count CBA_common_perFrameHandlerArray} else {0}; - ["scripts.pfh", "int", _pfhCount] call RangerMetrics_fnc_send; - - // Globals if server - if (isServer) then { - // Number of local units - ["count.units", "float", count allUnits, true] call RangerMetrics_fnc_send; - ["count.groups", "float", count allGroups, true] call RangerMetrics_fnc_send; - ["count.vehicles", "float", count vehicles, true] call RangerMetrics_fnc_send; - ["count.players", "float", count allPlayers, true] call RangerMetrics_fnc_send; - }; - - - - - - - - private _headlessClients = entities "HeadlessClient_F"; - { - { - private _stats_fps = diag_fps; - ["stats.HCfps", "float", _stats_fps] remoteExec ["RangerMetrics_fnc_send", 2]; - - } remoteExecCall ["bis_fnc_call", owner _x]; - } foreach _headlessClients; - - - - -/** WORKING HEADLESS CODE COMMENTED OUT TO TRY SOMETHING DIFFERNT - - // Headless Clients FPS - // Thanks to CPL.Brostrom.A - private _headlessClients = entities "HeadlessClient_F"; - { - { - private _stats_fps = round diag_fps; - ["stats.HCfps", _stats_fps] remoteExec ["RangerMetrics_fnc_send", 2]; - } remoteExecCall ["bis_fnc_call", owner _x]; - } foreach _headlessClients; - - -*/ - - - - // log the runtime and switch off debug so it doesn't flood the log - if(missionNamespace getVariable ["RangerMetrics_debug",false]) then { - [format ["Run time: %1", diag_tickTime - _startTime], "DEBUG"] call RangerMetrics_fnc_log; - missionNamespace setVariable ["RangerMetrics_debug",false]; - }; -}; diff --git a/@RangerMetrics/addons/RangerMetrics/functions/fn_send.sqf b/@RangerMetrics/addons/RangerMetrics/functions/fn_send.sqf index 90286f1..ec10e49 100644 --- a/@RangerMetrics/addons/RangerMetrics/functions/fn_send.sqf +++ b/@RangerMetrics/addons/RangerMetrics/functions/fn_send.sqf @@ -1,55 +1,50 @@ -params ["_metric", "_valueType", "_value", ["_global", false]]; - -private _profileName = profileName; -private _prefix = "Arma3"; -private _locality = [profileName, "global"] select _global; - -// InfluxDB settings -// private _connection = "http://indifox.info:8086"; -// private _token = "BwOzapPBLZ-lhtrcs3PC2Jk2p7plCC0UckHKxe8AxulYkk9St1q2aloXMW2rDD4X2ufIkx3fwSbEe6ZeJo8ljg=="; -// private _org = "ranger-metrics"; -// private _bucket = "ranger-metrics"; - -// private _extSend = format["%1,%2", format["%1,%2,%3,%4,%5,%6", _connection, _token, _org, _bucket, _metricPath, _metric], _value]; -private _extSend = [ - // _connection, - // _token, - // _org, - // _bucket, - _profileName, - _locality, - missionName, - worldName, - serverName, - _metric, - _valueType, - _value -]; - -if(missionNamespace getVariable ["RangerMetrics_debug",false]) then { - [format ["Sending a3influx data: %1", _extSend], "DEBUG"] call RangerMetrics_fnc_log; -}; - // send the data -private _return = "RangerMetrics" callExtension ["sendToInflux", _extSend]; +[{ + if(missionNamespace getVariable ["RangerMetrics_debug",false]) then { + [format ["Sending a3influx data: %1", RangerMetrics_messageQueue], "DEBUG"] call RangerMetrics_fnc_log; + }; -// shouldn't be possible, the extension should always return even if error -if(isNil "_return") exitWith { - [format ["return was nil (%1)", _extSend], "ERROR"] call RangerMetrics_fnc_log; - false -}; + // duplicate the message queue so we can clear it before sending the data + private _extSend = + RangerMetrics_messageQueue; + RangerMetrics_messageQueue = createHashMap; -// extension error codes -// if(_return in ["invalid metric value","malformed, could not find separator"] ) exitWith { -// [format ["%1 (%2)", _return, _extSend], "ERROR"] call RangerMetrics_fnc_log; -// false -// }; + { + // for each bucket, send data to extension + private _bucketName = _x; + private _bucketData = _y; + // if (true) exitWith { + [format ["bucketName: %1", _bucketName], "DEBUG"] call RangerMetrics_fnc_log; + [format ["bucketData: %1", _bucketData], "DEBUG"] call RangerMetrics_fnc_log; + // }; + private _return = ["RangerMetrics.influx.write_influx", [[_bucketName, _bucketData]]] call py3_fnc_callExtension; -// success, only show if debug is set -if(missionNamespace getVariable ["RangerMetrics_debug",false]) then { - // _returnArgs = _return splitString (toString [10,32]); - _returnArgs = parseSimpleArray _return; - [format ["a3influx return data: %1",_returnArgs], "DEBUG"] call RangerMetrics_fnc_log; -}; + // shouldn't be possible, the extension should always return even if error + if(isNil "_return") exitWith { + [format ["return was nil (%1)", _extSend], "ERROR"] call RangerMetrics_fnc_log; + false + }; -true + if (typeName _return != "ARRAY") exitWith { + [format ["return was not an array (%1)", _extSend], "ERROR"] call RangerMetrics_fnc_log; + false + }; + + if (count _return == 0) exitWith { + [format ["return was empty (%1)", _extSend], "ERROR"] call RangerMetrics_fnc_log; + false + }; + + if (count _return == 2) exitWith { + [format ["return was error (%1)", _extSend], "ERROR"] call RangerMetrics_fnc_log; + false + }; + + // success, add to list of active threads + RangerMetrics_activeThreads pushBack (_return select 0); + + // success, only show if debug is set + if (missionNamespace getVariable ["RangerMetrics_debug",false]) then { + [format ["a3influx threadId: %1", _return], "DEBUG"] call RangerMetrics_fnc_log; + }; + } forEach _extSend; +}] call CBA_fnc_execNextFrame; \ No newline at end of file diff --git a/@RangerMetrics/addons/RangerMetrics/helpers/fn_getMissionConfig.sqf b/@RangerMetrics/addons/RangerMetrics/helpers/fn_getMissionConfig.sqf new file mode 100644 index 0000000..8b03339 --- /dev/null +++ b/@RangerMetrics/addons/RangerMetrics/helpers/fn_getMissionConfig.sqf @@ -0,0 +1,164 @@ +// get basic config properties +private _properties = [ + ["settings_mission_info", [ + "author", + "onLoadName", + "onLoadMission", + "loadScreen", + "header", + "onLoadIntro", + "onLoadMissionTime", + "onLoadIntroTime", + "briefingName", + "overviewPicture", + "overviewText", + "overviewTextLocked", + "onBriefingGear", + "onBriefingGroup", + "onBriefingPlan" + ]], + ["settings_respawn", [ + "respawn", + "respawnButton", + "respawnDelay", + "respawnVehicleDelay", + "respawnDialog", + "respawnOnStart", + "respawnTemplates", + "respawnWeapons", + "respawnMagazines", + "reviveMode", + "reviveUnconsciousStateMode", + "reviveRequiredTrait", + "reviveRequiredItems", + "reviveRequiredItemsFakConsumed", + "reviveMedicSpeedMultiplier", + "reviveDelay", + "reviveForceRespawnDelay", + "reviveBleedOutDelay", + "enablePlayerAddRespawn" + ]], + ["settings_player_ui", [ + "overrideFeedback", + "showHUD", + "showCompass", + "showGPS", + "showGroupIndicator", + "showMap", + "showNotePad", + "showPad", + "showWatch", + "showUAVFeed", + "showSquadRadar" + ]], + ["settings_corpse_and_wreck", [ + "corpseManagerMode", + "corpseLimit", + "corpseRemovalMinTime", + "corpseRemovalMaxTime", + "wreckManagerMode", + "wreckLimit", + "wreckRemovalMinTime", + "wreckRemovalMaxTime", + "minPlayerDistance" + ]], + ["settings_mission_general", [ + "aiKills", + "briefing", + "debriefing", + "disableChannels", + "disabledAI", + "disableRandomization", + "enableDebugConsole", + "enableItemsDropping", + "enableTeamSwitch", + "forceRotorLibSimulation", + "joinUnassigned", + "minScore", + "avgScore", + "maxScore", + "onCheat", + "onPauseScript", + "saving", + "scriptedPlayer", + "skipLobby", + "HostDoesNotSkipLobby", + "missionGroup" + ] + ] +]; + +private _propertyValues = createHashMap; +{ + private _category = _x#0; + private _values = _x#1; + { + private _property = _x; + private _value = (missionConfigFile >> _property) call BIS_fnc_getCfgData; + hint str [_category, _property, _value]; + if (!isNil "_value") then { + if (typeName _value == "ARRAY") then { + _value = _value joinString ","; + }; + if (isNil {_propertyValues get _category}) then { + _propertyValues set [_category, createHashMap]; + }; + _propertyValues get _category set [_property, _value]; + }; + } forEach _values; +} forEach _properties; + + +// Take the generated hashmap and queue metrics +{ + private _measurementCategory = _x; + private _fields = _y; + ["config", _measurementCategory, nil, _fields, "int", 0] call RangerMetrics_fnc_queue; +} forEach _propertyValues; + + + + +// get all properties in missionConfigFile (recursive) +// private _nextCfgClasses = "true" configClasses (missionConfigFile); +// private _nextCfgProperties = configProperties [missionConfigFile]; +// private _cfgProperties = createHashMap; +// while {count _nextCfgClasses > 0} do { +// { +// private _thisConfig = _x; +// private _thisConfigClasses = "true" configClasses _thisConfig; +// _thisCfgProperties = configProperties [_thisConfig, "!isClass _x"]; +// _saveHash = createHashMap; +// { +// _propertyCfg = _x; +// _saveHash set [configName _propertyCfg, (_propertyCfg) call BIS_fnc_getCfgData]; +// } forEach _thisCfgProperties; +// _hierarchy = (configHierarchy _thisConfig); +// _hierarchy deleteAt 0; +// _hierarchy = _hierarchy apply {configName _x}; +// _hierarchyStr = _hierarchy joinString "."; +// _hierarchyStrParent = (_hierarchy select [0, count _hierarchy - 2]) joinString "."; +// systemChat _hierarchyStrParent; + +// // if (_cfgProperties get _hierarchyStrParent == nil) then { +// // _cfgProperties set [_hierarchyStrParent, createHashMap]; +// // }; +// _cfgProperties set [_hierarchyStr, _saveHash]; + + +// // _cfgProperties set [_hierarchy, _saveHash]; +// _nextCfgClasses append _thisConfigClasses; + +// } forEach _nextCfgClasses; +// _nextCfgClasses = _nextCfgClasses - _cfgClasses; +// }; +// text ([_cfgProperties] call RangerMetrics_fnc_encodeJSON); + + + + + +// iterate through _cfgProperties hashmap and queue metrics +// { + +// } forEach _cfgProperties; \ No newline at end of file diff --git a/@RangerMetrics/python/__init__.py b/@RangerMetrics/python/__init__.py index 3e7bf1b..5da1f02 100644 --- a/@RangerMetrics/python/__init__.py +++ b/@RangerMetrics/python/__init__.py @@ -1 +1,3 @@ from . import influx + +influx diff --git a/@RangerMetrics/python/influx.py b/@RangerMetrics/python/influx.py index 066c197..6e0622d 100644 --- a/@RangerMetrics/python/influx.py +++ b/@RangerMetrics/python/influx.py @@ -5,39 +5,100 @@ from pyproj import Transformer from datetime import datetime import json import os -from .threading_utils import call_slow_function, has_call_finished, get_call_value - -settings = None - - -host = settings["influxdb"]["host"] -token = settings["influxdb"]["token"] -org = settings["influxdb"]["org"] -bucket = settings["influxdb"]["bucket"] -refreshRateMs = settings["arma3"]["refreshRateMs"] - -transformer = Transformer.from_crs("epsg:3857", "epsg:4326") - -DBCLIENT = influxdb_client.InfluxDBClient( - url=host, token=token, org=org, enable_gzip=True +from .threading_utils import ( + call_slow_function, + has_call_finished, + get_call_value, + THREADS, + THREAD_ID, ) -WRITE_API = DBCLIENT.write_api(write_options=SYNCHRONOUS) + +# get parent of parent directory (mod dir) +MOD_DIR = ( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + .lstrip("\\") + .lstrip("?") + .lstrip("\\") +) + +SETTINGS_FILE = "" +SETTINGS = None + +DBCLIENT = None +WRITE_API = None + +PROCESS_LOG = MOD_DIR + "\\rangermetrics_process.log" +ERROR_LOG = MOD_DIR + "\\rangermetrics_error.log" +DATA_LOG = MOD_DIR + "\\rangermetrics_data.log" + + +# TRANSFORMER = Transformer.from_crs("epsg:3857", "epsg:4326") def get_dir(): - # get current dir - return [ - os.path.dirname(os.path.realpath(__file__)) + "\\" + os.path.basename(__file__) - ] + # get current dir without leading or trailing slashes + this_path = ( + os.path.dirname(os.path.realpath(__file__)) + .lstrip("\\") + .lstrip("?") + .lstrip("\\") + ) + return [0, "Current directory", this_path, PROCESS_LOG] def load_settings(): + # check if settings.json exists in MOD_DIR + if not (os.path.isfile(os.path.join(MOD_DIR, "settings.json"))): + return [1, "settings.json not found in mod directory", MOD_DIR] + + global SETTINGS_FILE + SETTINGS_FILE = os.path.join(MOD_DIR, "settings.json") + # import settings from settings.json - global settings - with open("settings.json", "r") as f: - settings = json.load(f) - # get path to arma3 directory - global arma3_path + global SETTINGS + with open(SETTINGS_FILE, "r") as f: + SETTINGS = json.load(f) + + settings_validation = [ + ["influxdb", "host"], + ["influxdb", "token"], + ["influxdb", "org"], + ["influxdb", "defaultBucket"], + ["arma3", "refreshRateMs"], + ] + for setting in settings_validation: + if not (setting[0] in SETTINGS and setting[1] in SETTINGS[setting[0]]): + return [1, f"Missing setting: {setting[0]} {setting[1]}"] + + # prep settings out to hashMap style list for A3 + # [[key, [subkey, subvalue], [subkey, subvalue]]] + settings_out = [] + for key, value in SETTINGS.items(): + if isinstance(value, dict): + this_values = [] + for subkey, subvalue in value.items(): + this_values.append([subkey, subvalue]) + settings_out.append([key, this_values]) + else: + settings_out.append([key, value]) + return [0, "Settings loaded", settings_out] + + +def connect_to_influx(): + global DBCLIENT + DBCLIENT = influxdb_client.InfluxDBClient( + url=SETTINGS["influxdb"]["host"], + token=SETTINGS["influxdb"]["token"], + org=SETTINGS["influxdb"]["org"], + enable_gzip=True, + ) + if DBCLIENT is None: + return [1, "Error connecting to InfluxDB"] + global WRITE_API + WRITE_API = DBCLIENT.write_api(write_options=SYNCHRONOUS) + if WRITE_API is None: + return [1, "Error connecting to InfluxDB"] + return [0, "Connected to InfluxDB"] def test_data(data): @@ -57,39 +118,45 @@ def test_data(data): return [data, dict(data[1])] -def log_to_file(data): - # threaded, write backup to file - with open("influxdb_data.log", "a") as f: - f.write(f"{data}\n") +def log_process(line): + # log the process to a file + with open(PROCESS_LOG, "a+") as f: + f.write(f"{datetime.now()}: {line}\n") return True -def write_data(data): +def log_error(line): + # log errors to a file + with open(ERROR_LOG, "a+") as f: + f.write(f"{datetime.now()}: {line}\n") + return True + + +def write_influx(data): # thread the write to influxdb - # t = threading.Thread(target=write_points_async, args=(data,), daemon=True) - # t.start() - thread_id = call_slow_function(write_points_async, data) + thread_id = call_slow_function(write_influx_async, (data,)) return [thread_id] -def write_points_async(data): +def write_influx_async(data): processed = [] timestamp = f" {int(datetime.now().timestamp() * 1e9)}" + # return [data] + target_bucket = data[0] or SETTINGS["influxdb"]["defaultBucket"] + log_process(f"Writing to bucket {target_bucket}") - process_log = open("influxdb_process.log", "a") + log_process(f"Processing {len(data)} data points") + for point in data[1]: - # process_log.write(f"{datetime.now()}: Processing {len(data)} data points\n") - # process_log.write(f"{datetime.now()}: {data[0]}\n") - - for point in data: measurement = point[0] - tag_set = point[1] - field_set = point[2] - if len(point) > 3: - position = point[3] + value_type = point[1] + tag_dict = dict(point[2]) + field_dict = dict(point[3]) - tag_dict = dict(tag_set) - field_dict = dict(field_set) + if value_type == "int": + field_dict["value"] = int(field_dict["value"]) + elif value_type == "float": + field_dict["value"] = float(field_dict["value"]) point_dict = { "measurement": measurement, @@ -97,37 +164,26 @@ def write_points_async(data): "fields": field_dict, } - if position is not None: - - # convert position to lat/lon - lat, lon = transformer.transform( - position[0], - position[1], - ) - point_dict["fields"]["lat"] = lat - point_dict["fields"]["lon"] = lon - point_dict["fields"]["alt"] = position[2] - processed.append(point_dict) - # process_log.write(f"{datetime.now()}: Writing {len(processed)} data points\n") - # process_log.write(f"{datetime.now()}: {json.dumps(processed, indent=2)}\n") + log_process(f"Writing {len(processed)} data points") try: - result = WRITE_API.write(bucket, org, processed) - process_log.write(f"{datetime.now()}: Success\n") + result = WRITE_API.write(target_bucket, SETTINGS["influxdb"]["org"], processed) + if result is not None: + log_process(f"Wrote {len(processed)} data points") except Exception as e: # write to file - with open("influxdb_error.log", "a") as f: - f.write(f"{datetime.now()}: {e}\n") + log_error(f"Error writing to influxdb: {e}") + return [1, f"Error writing to influxdb: {e}"] + success_count = len(processed) # free up memory + del data del processed - del transformer del timestamp - del process_log - return ("OK", 200) + return [0, f"Wrote {success_count} data points successfully"] has_call_finished # noqa imported functions diff --git a/@RangerMetrics/python/threading_utils.py b/@RangerMetrics/python/threading_utils.py index bb738dc..0dc073b 100644 --- a/@RangerMetrics/python/threading_utils.py +++ b/@RangerMetrics/python/threading_utils.py @@ -1,16 +1,36 @@ +import sys import threading # https://stackoverflow.com/a/65447493/6543759 class ThreadWithResult(threading.Thread): - def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): + def __init__( + self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None + ): + self.exc = None if not kwargs: kwargs = {} def function(): - self.result = target(*args, **kwargs) + self.exc = None + try: + self.result = target(*args, **kwargs) + except: # noqa + # Save details of the exception thrown but don't rethrow, + # just complete the function + self.exc = sys.exc_info() + super().__init__(group=group, target=function, name=name, daemon=daemon) + # https://stackoverflow.com/a/12223550/6543759 + def join(self, *args, **kwargs): + super().join(*args, **kwargs) + + if self.exc: + msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1]) + new_exc = Exception(msg) + raise new_exc.with_traceback(self.exc[2]) + THREADS = {} THREAD_ID = 0 @@ -44,9 +64,17 @@ def get_call_value(thread_id): thread = THREADS[thread_id] if thread.is_alive(): # Thread is still working - raise ValueError('Thread is still running!') + raise ValueError("Thread is still running!") # Thread has finished, we can return its value now - thread.join() - del THREADS[thread_id] - return thread.result \ No newline at end of file + try: + thread.join() + finally: + del THREADS[thread_id] + + try: + return thread.result + except AttributeError: + raise RuntimeError( + 'The thread does not have the "result" attribute. An unhandled error occurred inside your Thread' + )