154 lines
4.0 KiB
Python
154 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from fitparse import FitFile
|
|
from datetime import datetime, timedelta
|
|
import os, sys, shutil
|
|
from os import walk
|
|
import csv
|
|
import pandas as pd
|
|
|
|
|
|
path = "/home/manuel/Dropbox/Apps/HealthFitExporter/"
|
|
#path = "/Users/manuel/Projekte/eBike-Influx-Calculation/"
|
|
ext = ('.fit')
|
|
csv_file = "/www/wwwroot/www.manuelw.de/ebike/ebike_data.csv"
|
|
|
|
output = {}
|
|
fitData = {}
|
|
fitData["total_distance"] = 0
|
|
fitData["total_ascent"] = 0
|
|
fitData["avg_speed"] = 0
|
|
fitData["avg_speed_count"] = 0
|
|
fitFilesCount = 0
|
|
|
|
loading = sys.argv[1]
|
|
watt = sys.argv[2]
|
|
row = []
|
|
|
|
##
|
|
### auf neue Files überwachen
|
|
def main():
|
|
f = []
|
|
for base, dirs, files in os.walk(path):
|
|
files = [ fi for fi in files if fi.endswith(".fit") ]
|
|
if files:
|
|
processFIT(files)
|
|
|
|
##
|
|
### parse FIT file und bestimme Handling
|
|
def processFIT(files):
|
|
global fitFilesCount
|
|
|
|
print("Files:", len(files))
|
|
fitFilesCount = len(files)
|
|
for filename in files:
|
|
#print(filename)
|
|
file = FitFile(path+filename)
|
|
for record in file.get_messages("session"):
|
|
## nur wenn Radfahren
|
|
if record.get_value("sport") == "cycling":
|
|
## Verarbeitung starten
|
|
doAll(file)
|
|
|
|
showEnd()
|
|
readCSV()
|
|
deleteFiles()
|
|
|
|
|
|
##
|
|
### todo on all fit files
|
|
def doAll(file):
|
|
for record in file.get_messages("session"):
|
|
fitData["avg_cadence"] = record.get_value("avg_cadence")
|
|
#fitData["avg_speed"] += round(float(record.get_value("avg_speed")) / 0.27777777777778, 1)
|
|
fitData["avg_speed"] += record.get_value("avg_speed")
|
|
fitData["avg_speed_count"] +=1
|
|
#fitData["total_ascent"] = float(record.get_value("total_ascent")) if record.get_value("total_ascent") != None else 0
|
|
fitData["total_distance"] += float(round(record.get_value("total_distance") / 1000, 1))
|
|
|
|
fitData["avg_speed"] = round(float(fitData["avg_speed"] / fitData["avg_speed_count"]), 1)
|
|
doHealthFit(file, record.get_value("min_altitude"))
|
|
|
|
|
|
def doHealthFit(file, min_alt=0.0):
|
|
### Kadenz
|
|
i=0
|
|
cadence=0
|
|
last_elev=min_alt
|
|
ges_elev=0.0
|
|
|
|
for record in file.get_messages("record"):
|
|
## korrekte Kadenz ermitteln
|
|
if record.get_value("cadence") != None and record.get_value("cadence") > 0:
|
|
i+=1
|
|
cadence += record.get_value("cadence")
|
|
|
|
## korrekte Höhenmeter ermitteln
|
|
if record.get_value("altitude") != None:
|
|
if record.get_value("altitude") > last_elev:
|
|
ges_elev += record.get_value("altitude") - last_elev
|
|
|
|
last_elev = record.get_value("altitude")
|
|
|
|
#fitData["avg_cadence"] = int(cadence/i)
|
|
fitData["total_ascent"] += round(float(ges_elev), 1)
|
|
fitData["total_ascent"] = round(float(fitData["total_ascent"]), 1)
|
|
|
|
|
|
def showEnd():
|
|
print("Akku geladen",loading)
|
|
print("Ladung Wh",watt)
|
|
print("Total Range", fitData["total_distance"])
|
|
print("Total Ascent", fitData["total_ascent"])
|
|
print("AVG Speed", fitData["avg_speed"])
|
|
|
|
|
|
def readCSV():
|
|
global fitData, fitFilesCount
|
|
|
|
with open(csv_file, "r", encoding="utf-8", errors="ignore") as scraped:
|
|
row = []
|
|
try:
|
|
row.extend(scraped.readlines()[-1].replace("\n", "").split(","))
|
|
row = row[0:-5]
|
|
dropCSV()
|
|
row.extend([fitData["total_distance"], fitData["total_ascent"], fitData["avg_speed"], watt, fitFilesCount])
|
|
writeCSV(row)
|
|
|
|
except:
|
|
print("EXCEPTION")
|
|
row.extend(["Date", "Loading", "Total Range", "Total Ascent", "AVG Speed", "Loaded Wh", "Fahrten"])
|
|
writeCSV(row)
|
|
|
|
|
|
def writeCSV(row):
|
|
with open(csv_file, 'a', newline='') as csvfile:
|
|
spamwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
|
|
spamwriter.writerow(row)
|
|
spamwriter.writerow([datetime.today().strftime('%d-%m-%Y'), loading, 0, 0, 0, 0, 0])
|
|
|
|
|
|
def dropCSV():
|
|
df = pd.read_csv(csv_file, index_col='Date', on_bad_lines='skip')
|
|
df = df.iloc[:-1]
|
|
df.to_csv(csv_file, index=True)
|
|
|
|
|
|
def deleteFiles():
|
|
for filename in os.listdir(path):
|
|
file_path = os.path.join(path, filename)
|
|
try:
|
|
if os.path.isfile(file_path) or os.path.islink(file_path):
|
|
os.unlink(file_path)
|
|
#print("lösche... test")
|
|
elif os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
except Exception as e:
|
|
print('Failed to delete %s. Reason: %s' % (file_path, e))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|