working python

This commit is contained in:
2023-04-07 01:40:42 -07:00
parent deb145a7ce
commit 97bda2c8a8
13 changed files with 684 additions and 245 deletions

View File

@@ -1 +1,3 @@
from . import influx
influx

View File

@@ -5,39 +5,100 @@ from pyproj import Transformer
from datetime import datetime
import json
import os
from .threading_utils import call_slow_function, has_call_finished, get_call_value
settings = None
host = settings["influxdb"]["host"]
token = settings["influxdb"]["token"]
org = settings["influxdb"]["org"]
bucket = settings["influxdb"]["bucket"]
refreshRateMs = settings["arma3"]["refreshRateMs"]
transformer = Transformer.from_crs("epsg:3857", "epsg:4326")
DBCLIENT = influxdb_client.InfluxDBClient(
url=host, token=token, org=org, enable_gzip=True
from .threading_utils import (
call_slow_function,
has_call_finished,
get_call_value,
THREADS,
THREAD_ID,
)
WRITE_API = DBCLIENT.write_api(write_options=SYNCHRONOUS)
# get parent of parent directory (mod dir)
MOD_DIR = (
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
.lstrip("\\")
.lstrip("?")
.lstrip("\\")
)
SETTINGS_FILE = ""
SETTINGS = None
DBCLIENT = None
WRITE_API = None
PROCESS_LOG = MOD_DIR + "\\rangermetrics_process.log"
ERROR_LOG = MOD_DIR + "\\rangermetrics_error.log"
DATA_LOG = MOD_DIR + "\\rangermetrics_data.log"
# TRANSFORMER = Transformer.from_crs("epsg:3857", "epsg:4326")
def get_dir():
# get current dir
return [
os.path.dirname(os.path.realpath(__file__)) + "\\" + os.path.basename(__file__)
]
# get current dir without leading or trailing slashes
this_path = (
os.path.dirname(os.path.realpath(__file__))
.lstrip("\\")
.lstrip("?")
.lstrip("\\")
)
return [0, "Current directory", this_path, PROCESS_LOG]
def load_settings():
# check if settings.json exists in MOD_DIR
if not (os.path.isfile(os.path.join(MOD_DIR, "settings.json"))):
return [1, "settings.json not found in mod directory", MOD_DIR]
global SETTINGS_FILE
SETTINGS_FILE = os.path.join(MOD_DIR, "settings.json")
# import settings from settings.json
global settings
with open("settings.json", "r") as f:
settings = json.load(f)
# get path to arma3 directory
global arma3_path
global SETTINGS
with open(SETTINGS_FILE, "r") as f:
SETTINGS = json.load(f)
settings_validation = [
["influxdb", "host"],
["influxdb", "token"],
["influxdb", "org"],
["influxdb", "defaultBucket"],
["arma3", "refreshRateMs"],
]
for setting in settings_validation:
if not (setting[0] in SETTINGS and setting[1] in SETTINGS[setting[0]]):
return [1, f"Missing setting: {setting[0]} {setting[1]}"]
# prep settings out to hashMap style list for A3
# [[key, [subkey, subvalue], [subkey, subvalue]]]
settings_out = []
for key, value in SETTINGS.items():
if isinstance(value, dict):
this_values = []
for subkey, subvalue in value.items():
this_values.append([subkey, subvalue])
settings_out.append([key, this_values])
else:
settings_out.append([key, value])
return [0, "Settings loaded", settings_out]
def connect_to_influx():
global DBCLIENT
DBCLIENT = influxdb_client.InfluxDBClient(
url=SETTINGS["influxdb"]["host"],
token=SETTINGS["influxdb"]["token"],
org=SETTINGS["influxdb"]["org"],
enable_gzip=True,
)
if DBCLIENT is None:
return [1, "Error connecting to InfluxDB"]
global WRITE_API
WRITE_API = DBCLIENT.write_api(write_options=SYNCHRONOUS)
if WRITE_API is None:
return [1, "Error connecting to InfluxDB"]
return [0, "Connected to InfluxDB"]
def test_data(data):
@@ -57,39 +118,45 @@ def test_data(data):
return [data, dict(data[1])]
def log_to_file(data):
# threaded, write backup to file
with open("influxdb_data.log", "a") as f:
f.write(f"{data}\n")
def log_process(line):
# log the process to a file
with open(PROCESS_LOG, "a+") as f:
f.write(f"{datetime.now()}: {line}\n")
return True
def write_data(data):
def log_error(line):
# log errors to a file
with open(ERROR_LOG, "a+") as f:
f.write(f"{datetime.now()}: {line}\n")
return True
def write_influx(data):
# thread the write to influxdb
# t = threading.Thread(target=write_points_async, args=(data,), daemon=True)
# t.start()
thread_id = call_slow_function(write_points_async, data)
thread_id = call_slow_function(write_influx_async, (data,))
return [thread_id]
def write_points_async(data):
def write_influx_async(data):
processed = []
timestamp = f" {int(datetime.now().timestamp() * 1e9)}"
# return [data]
target_bucket = data[0] or SETTINGS["influxdb"]["defaultBucket"]
log_process(f"Writing to bucket {target_bucket}")
process_log = open("influxdb_process.log", "a")
log_process(f"Processing {len(data)} data points")
for point in data[1]:
# process_log.write(f"{datetime.now()}: Processing {len(data)} data points\n")
# process_log.write(f"{datetime.now()}: {data[0]}\n")
for point in data:
measurement = point[0]
tag_set = point[1]
field_set = point[2]
if len(point) > 3:
position = point[3]
value_type = point[1]
tag_dict = dict(point[2])
field_dict = dict(point[3])
tag_dict = dict(tag_set)
field_dict = dict(field_set)
if value_type == "int":
field_dict["value"] = int(field_dict["value"])
elif value_type == "float":
field_dict["value"] = float(field_dict["value"])
point_dict = {
"measurement": measurement,
@@ -97,37 +164,26 @@ def write_points_async(data):
"fields": field_dict,
}
if position is not None:
# convert position to lat/lon
lat, lon = transformer.transform(
position[0],
position[1],
)
point_dict["fields"]["lat"] = lat
point_dict["fields"]["lon"] = lon
point_dict["fields"]["alt"] = position[2]
processed.append(point_dict)
# process_log.write(f"{datetime.now()}: Writing {len(processed)} data points\n")
# process_log.write(f"{datetime.now()}: {json.dumps(processed, indent=2)}\n")
log_process(f"Writing {len(processed)} data points")
try:
result = WRITE_API.write(bucket, org, processed)
process_log.write(f"{datetime.now()}: Success\n")
result = WRITE_API.write(target_bucket, SETTINGS["influxdb"]["org"], processed)
if result is not None:
log_process(f"Wrote {len(processed)} data points")
except Exception as e:
# write to file
with open("influxdb_error.log", "a") as f:
f.write(f"{datetime.now()}: {e}\n")
log_error(f"Error writing to influxdb: {e}")
return [1, f"Error writing to influxdb: {e}"]
success_count = len(processed)
# free up memory
del data
del processed
del transformer
del timestamp
del process_log
return ("OK", 200)
return [0, f"Wrote {success_count} data points successfully"]
has_call_finished # noqa imported functions

View File

@@ -1,16 +1,36 @@
import sys
import threading
# https://stackoverflow.com/a/65447493/6543759
class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
def __init__(
self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None
):
self.exc = None
if not kwargs:
kwargs = {}
def function():
self.result = target(*args, **kwargs)
self.exc = None
try:
self.result = target(*args, **kwargs)
except: # noqa
# Save details of the exception thrown but don't rethrow,
# just complete the function
self.exc = sys.exc_info()
super().__init__(group=group, target=function, name=name, daemon=daemon)
# https://stackoverflow.com/a/12223550/6543759
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.with_traceback(self.exc[2])
THREADS = {}
THREAD_ID = 0
@@ -44,9 +64,17 @@ def get_call_value(thread_id):
thread = THREADS[thread_id]
if thread.is_alive():
# Thread is still working
raise ValueError('Thread is still running!')
raise ValueError("Thread is still running!")
# Thread has finished, we can return its value now
thread.join()
del THREADS[thread_id]
return thread.result
try:
thread.join()
finally:
del THREADS[thread_id]
try:
return thread.result
except AttributeError:
raise RuntimeError(
'The thread does not have the "result" attribute. An unhandled error occurred inside your Thread'
)