Introduced logging, ABC, applied black

This commit is contained in:
Jacob Holder 2021-09-28 23:03:18 +02:00
parent d19472ce45
commit a17630ad57
Signed by: jacob
GPG Key ID: 2194FC747048A7FD
10 changed files with 425 additions and 115 deletions

BIN
Diagramm1.dia Normal file

Binary file not shown.

47
ExampleDriver.py Normal file
View File

@ -0,0 +1,47 @@
from abc import abstractmethod
from threading import Thread, Event
from time import sleep
from queue import Queue
from driver import Driver
import logging
LOGGER = logging.getLogger()
class ExampleDriverWorker(Thread):
"""Communicates with the measuring hardware. Here we only produce random data."""
def __init__(self, message_queue):
super().__init__(name="Driver")
self.exit_request = Event()
def run(self):
"""Worker method of a python Thread. Called when the Thread is started."""
while not self.exit_request.is_set():
sleep(1)
class ExampleDriver(Driver):
"""docstring for Model."""
def __init__(self):
super(Driver, self).__init__()
self.measureQueue = Queue()
self.measureThread = ExampleDriverWorker(self.measureQueue)
self.measureThread.start()
self.measureThread.produceData.set()
self.speed = 10
self.running = False
def exit(self):
self.measureThread.exit_request.set()
def set_config(self, config):
self.speed = config["speed"]
def get_status(self):
return {"running": self.running}
@abstractmethod
def get_config(self):
pass

65
ExampleSensor.py Normal file
View File

@ -0,0 +1,65 @@
from datetime import datetime
from threading import Thread, Event
from time import sleep
from queue import Queue
from random import random
from sensor import Sensor
import logging
LOGGER = logging.getLogger()
class ExampleSensorWorker(Thread):
"""Communicates with the measuring hardware. Here we only produce random data."""
def __init__(self, message_queue):
super().__init__(name="Measure")
self.message_queue = message_queue
self.produceData = Event()
self.exit_request = Event()
self.prev = 0
def run(self):
"""Worker method of a python Thread. Called when the Thread is started."""
while not self.exit_request.is_set():
if self.produceData.is_set():
temp = self.prev + random() / 100.0
self.prev = temp
ts = datetime.utcnow()
self.message_queue.put((ts, temp))
else:
pass
sleep(1)
class ExampleSensor(Sensor):
"""docstring for Model."""
def __init__(self):
super(Sensor, self).__init__()
self.measureQueue = Queue()
self.measureThread = ExampleSensorWorker(self.measureQueue)
self.measureThread.start()
self.measureThread.produceData.set()
def start_measuring(self):
self.measureThread.produceData.set()
logging.info("I started meas")
def stop_measuring(self):
self.measureThread.produceData.clear()
logging.info("I stopped meas")
def exit(self):
self.stop_measuring()
self.measureThread.exit_request.set()
@property
def get_data(self):
time = []
val = []
while not self.measureQueue.empty():
t, data = self.measureQueue.get()
time.append(t)
val.append(data)
return time, val

212
FemtoDLPVA100F.py Normal file
View File

@ -0,0 +1,212 @@
#!/usr/bin/env python3
# from driver.Luci10 import Luci10
import logging
from numpy import floor
from time import sleep
from threading import Thread, Event
from sensor import Sensor
from driver import Driver
LOGGER = logging.getLogger()
class FemtoDLPVA100FWorker(Thread):
def __init__(
self,
name="FemtoDLPVA100F Worker",
index=1,
delay=0.2,
):
super().__init__()
self.exit_request = Event()
self.dc = Event()
self.low_noise = Event()
self.lsbA = Event()
self.msbA = Event()
self.lsbB = Event()
self.msbB = Event()
self.overload_a = False
self.overload_b = False
self.name = name
# self.luci = Luci10(printer=False)
self.index = index
self.delay = delay
def run(self):
while not self.exit_request.is_set():
byte_a, byte_b = 0, 0
if self.lsbA.is_set():
byte_a |= 0b00000010 # Set 2nd bit to 1
if self.msbA.is_set():
byte_a += 4
if self.lsbB.is_set():
byte_b += 2
if self.msbB.is_set():
byte_b += 4
if self.dc.is_set():
byte_a += 8
byte_b += 8
if self.low_noise.is_set():
byte_a += 16
byte_b += 16
# self.luci.led_on()
# self.luci.write_bytes(self.index, byte_a, byte_b)
# self.luci.led_off()
LOGGER.debug(f"({self.name}) write bytes A, B: {byte_a:b}, {byte_b:b}")
sleep(self.delay)
class FemtoDLPVA100F(Driver):
def __init__(self, name="FemtoDLPVA100F", index=1):
self.name = name
# self.luci = Luci10(printer=False)
self.index = index
self.overload_A = False
self.overload_B = False
self.check_overload()
self.femtoThread = FemtoDLPVA100FWorker()
self.femtoThread.dc.set()
self.femtoThread.low_noise.set()
self.femtoThread.lsbA.clear()
self.femtoThread.msbA.clear()
self.femtoThread.lsbB.clear()
self.femtoThread.msbB.clear()
self.femtoThread.start()
self.dc = True
self.low_noise = True
self.exp_a = 1
self.exp_b = 1
def check_overload(self):
overload_A = self.check_overload_a()
overload_B = self.check_overload_b()
return overload_A, overload_B
def get_status(self):
a, b = self.check_overload()
return {"overload_a": a, "overload_b": b}
def check_overload_a(self):
return False
# self.luci.led_on()
# self.overload_A = not self.luci.get_status_pin5(self.index)
# self.luci.led_off()
LOGGER.info(f"({self.name:s}) overload A: {self.overload_A}")
return self.overload_A
def check_overload_b(self):
return True
# self.luci.led_on()
# self.overload_B = not self.luci.get_status_pin6(self.index)
# self.luci.led_off()
LOGGER.info(f"({self.name:s}) overload B: {self.overload_B}")
return self.overload_B
def exit(self):
self.femtoThread.exit_request.set()
LOGGER.info("(%s) Thread closed." % self.name)
def get_config(self):
config = {
"dc": self.dc,
"low_noise": self.low_noise,
"exp_a": self.exp_a,
"exp_b": self.exp_b,
}
LOGGER.info("(%s) config: %s" % (self.name, config))
return config
def set_config(self, config):
# TODO proper dict parsing
exp_a = 1
exp_b = 1
dc = True
low_noise = True
self.dc = dc
if dc:
self.set_dc()
else:
self.reset_dc()
# TODO reduce spam
if low_noise:
self.set_low_noise()
self.low_noise = True
else:
self.reset_low_noise()
self.low_noise = False
self.set_exponent_a(exp_a=exp_a)
self.exp_a = exp_a
self.set_exponent_b(exp_b=exp_b)
self.exp_b = exp_b
def set_dc(self):
self.femtoThread.dc.set()
LOGGER.info("(%s) dc is set to %s" % (self.name, True))
def reset_dc(self):
self.femtoThread.dc.clear()
LOGGER.info("(%s) dc is set to %s" % (self.name, False))
def set_low_noise(self):
self.femtoThread.low_noise.set()
LOGGER.info("(%s) low noise is set to %s" % (self.name, True))
def reset_low_noise(self):
self.femtoThread.low_noise.clear()
LOGGER.info("(%s) low noise is set to %s" % (self.name, False))
def set_exponent_a(
self,
exp_a=1,
):
if exp_a < 1:
exp_a = 1
if exp_a > 4:
exp_a = 4
if (exp_a % 2) == 0:
self.femtoThread.lsbA.set()
else:
self.femtoThread.lsbA.clear()
if (floor((exp_a - 1) / 2) - 1) == 0:
self.femtoThread.msbA.set()
else:
self.femtoThread.msbA.clear()
LOGGER.debug("(%s) exponent a = %i" % (self.name, exp_a))
def set_exponent_b(
self,
exp_b=1,
):
if exp_b < 1:
exp_b = 1
if exp_b > 4:
exp_b = 4
if (exp_b % 2) == 0:
self.femtoThread.lsbB.set()
else:
self.femtoThread.lsbB.clear()
if (floor((exp_b - 1) / 2) - 1) == 0:
self.femtoThread.msbB.set()
else:
self.femtoThread.msbB.clear()
LOGGER.debug("(%s) exponent a = %i" % (self.name, exp_b))

21
driver.py Normal file
View File

@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
class Driver(ABC):
"""docstring for Model."""
@abstractmethod
def exit(self):
pass
@abstractmethod
def set_config(self, config):
pass
@abstractmethod
def get_status(self):
pass
@abstractmethod
def get_config(self):
pass

View File

@ -18,9 +18,13 @@ class Measurement:
data: list of tuples: (timestamp, value)
"""
if sensor_name not in self.data_files:
self.data_files[sensor_name] = open(os.path.join(self.path, f"{sensor_name}_data.csv"), "w")
self.data_files[sensor_name] = open(
os.path.join(self.path, f"{sensor_name}_data.csv"), "w"
)
for timestamp, value in zip(data[0], data[1]):
self.data_files[sensor_name].write(f"{timestamp.isoformat()}, {timestamp.timestamp()}, {value}\n")
self.data_files[sensor_name].write(
f"{timestamp.isoformat()}, {timestamp.timestamp()}, {value}\n"
)
def save_version_text(self, comment):
with open(os.path.join(self.path, "version.txt"), "w") as file:
@ -28,4 +32,3 @@ class Measurement:
if comment:
file.write(comment + "\n")
file.write("More descriptions of relevant parameters\n")

View File

@ -8,9 +8,11 @@ Data:
from datetime import datetime
from threading import Event, Thread
from time import sleep
from sensor import Sensor
from FemtoDLPVA100F import FemtoDLPVA100F
from measurement import Measurement
from plot_data import PlotData
from ExampleSensor import ExampleSensor
class DataGrabber(Thread):
def __init__(self, sensors, drivers, plot_data):
@ -25,7 +27,7 @@ class DataGrabber(Thread):
while not self.exit_request.is_set():
for name, sens in self.sensors.items():
dat = sens.get_data
self.plot_data.append_data(name,dat)
self.plot_data.append_data(name, dat)
if self.measurement is not None:
self.measurement.append_data(name, dat)
sleep(1)
@ -34,7 +36,8 @@ class DataGrabber(Thread):
class Model:
def __init__(self):
self.plot_data = PlotData()
self.sensors = {"temp": Sensor(), "keks": Sensor()}
a = ExampleSensor()
self.sensors = {"temp": a, "multi": ExampleSensor()}
self.drivers = []
self.measurement = None
self.data_grabber = DataGrabber(self.sensors, self.drivers, self.plot_data)
@ -74,4 +77,4 @@ class Model:
"""
"""

View File

@ -2,12 +2,12 @@ import numpy as np
from datetime import datetime, timedelta
class PlotData():
class PlotData:
def __init__(self):
#resolution * timeout must be at least 2-3 times higher then the slowest refresh rate
# resolution * timeout must be at least 2-3 times higher then the slowest refresh rate
self.resolution = 100 # ms
self.timeout = 100 #cycles
self.timeout = 100 # cycles
self.data = {"time": np.array([])}
self.start_time = datetime.utcnow()
self.queues = {}
@ -22,48 +22,51 @@ class PlotData():
return self.data.get(key, np.full_like(self.data["time"], np.nan, dtype=np.double))
def append_data(self, sensor_name, data):
"""
"""
""" """
if sensor_name not in self.data:
self.queues[sensor_name] = ([],[])
self.queues[sensor_name] = ([], [])
self.data[sensor_name] = np.full_like(self.data["time"], np.nan, dtype=np.double)
for time, dat in zip(data[0], data[1]):
self.queues[sensor_name][0].append((time-self.start_time)/ timedelta(milliseconds=self.resolution))
self.queues[sensor_name][0].append(
(time - self.start_time) / timedelta(milliseconds=self.resolution)
)
self.queues[sensor_name][1].append(dat)
self.extend_timeline()
self.sort_in_queue()
self.drop_old()
def extend_timeline(self):
#extend numpy arrays with passed time
#unknown values are filled with nans
end = (datetime.utcnow()-self.start_time)/timedelta(milliseconds=self.resolution)
# extend numpy arrays with passed time
# unknown values are filled with nans
end = (datetime.utcnow() - self.start_time) / timedelta(milliseconds=self.resolution)
self.data["time"] = np.arange(0, end, 1).astype(np.double)
for key in self.data:
if key != "time":
pad_length = self.data["time"].size-self.data[key].size
self.data[key] = np.pad(self.data[key], (0,pad_length), mode="constant", constant_values=np.nan)
pad_length = self.data["time"].size - self.data[key].size
self.data[key] = np.pad(
self.data[key], (0, pad_length), mode="constant", constant_values=np.nan
)
def sort_in_queue(self):
#linear interpolation and adding it to the array
# linear interpolation and adding it to the array
for key in self.queues:
time = np.array(self.queues[key][0])
data = np.array(self.queues[key][1])
if time.size > 1:
inter = np.interp(self.data["time"], time, data,left=np.nan,right=np.nan)
inter = np.interp(self.data["time"], time, data, left=np.nan, right=np.nan)
self.data[key] = np.where(np.isnan(inter), self.data[key], inter)
def drop_old(self):
for key in self.queues:
time = np.array(self.queues[key][0])
old = self.data["time"][-1]-(self.timeout)
if time.size>2:
drop_index = (np.where(time < old)[0])
old = self.data["time"][-1] - (self.timeout)
if time.size > 2:
drop_index = np.where(time < old)[0]
if len(drop_index) is not 0:
drop_index = drop_index[-1]
self.queues[key] = (self.queues[key][0][drop_index:],self.queues[key][1][drop_index:])
self.queues[key] = (
self.queues[key][0][drop_index:],
self.queues[key][1][drop_index:],
)
print(drop_index)

View File

@ -1,77 +1,21 @@
import numpy as np
from datetime import datetime
from threading import Thread, Event
from time import sleep, time
from queue import Queue
from random import random
from abc import ABC, abstractmethod
class SensorWorker(Thread):
""" Communicates with the measuring hardware. Here we only produce random data. """
def __init__(self, message_queue):
super().__init__(name="Measure")
self.message_queue = message_queue
self.produceData = Event()
self.exit_request = Event()
self.prev= 0
def run(self):
""" Worker method of a python Thread. Called when the Thread is started. """
while not self.exit_request.is_set():
if self.produceData.is_set():
temp = self.prev + random() / 100.0
self.prev = temp
ts = datetime.utcnow()
self.message_queue.put((ts, temp))
else:
pass
sleep(1)
class Sensor(object):
class Sensor(ABC):
"""docstring for Model."""
def __init__(self):
super(Sensor, self).__init__()
self.measureQueue = Queue()
self.measureThread = SensorWorker(self.measureQueue)
self.measureThread.start()
self.measureThread.produceData.set()
@abstractmethod
def start_measuring(self):
self.measureThread.produceData.set()
print("I started meas")
pass
@abstractmethod
def stop_measuring(self):
self.measureThread.produceData.clear()
print("I stopped meas")
pass
@abstractmethod
def exit(self):
self.stop_measuring()
self.measureThread.exit_request.set()
pass
@property
def get_data(self):
time = []
val = []
while not self.measureQueue.empty():
t, data = self.measureQueue.get()
time.append(t)
val.append(data)
return time, val
# def clear(self):
# self.data = []
# self.t = []
# print("I cleared meas")
#
# def save_measuring(self, path="temp_temp.csv"):
# print("I saved meas")
# np.savetxt(path, np.array(self.data))
pass

56
view.py
View File

@ -5,6 +5,7 @@
#
import importlib.machinery
import logging
from threading import Thread
import wx
from model import Model
@ -21,7 +22,10 @@ from matplotlib.backends.backend_wxagg import (
NavigationToolbar2WxAgg as NavigationToolbar,
)
from matplotlib.figure import Figure
import numpy as np
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
# begin wxGlade: dependencies
# end wxGlade
@ -54,7 +58,7 @@ class PlotPanel(wx.Panel):
self.ax = self.fig.add_subplot()
self.fig.legend()
dat = self.model.plot_data
(self.im,) = self.ax.plot(dat.get("keks"), dat.get("temp"), "-o",label="temp")
(self.im,) = self.ax.plot(dat.get("keks"), dat.get("temp"), "-o", label="temp")
self.toolbar.update() # Not sure why this is needed - ADS
def get_toolbar(self):
@ -113,7 +117,7 @@ class MyFrame(wx.Frame):
sizer_status = wx.BoxSizer(wx.VERTICAL)
grid_sizer_main.Add(sizer_status, (0, 0), (2, 1), wx.EXPAND, 0)
#TODO test
# TODO test
label_1 = wx.StaticText(self.panel_1, wx.ID_ANY, "Text1")
sizer_status.Add(label_1, 0, 0, 0)
label_2 = wx.StaticText(self.panel_1, wx.ID_ANY, u"Für")
@ -135,7 +139,7 @@ class MyFrame(wx.Frame):
grid_sizer_measurement = wx.GridBagSizer(0, 0)
grid_sizer_3.Add(grid_sizer_measurement, (0, 0), (1, 1), wx.EXPAND, 0)
self.b_start = wx.Button(self.panel_1, wx.ID_ANY, "Start")
grid_sizer_measurement.Add(self.b_start, (0, 0), (1, 1), 0, 0)
@ -144,7 +148,6 @@ class MyFrame(wx.Frame):
self.b_run = wx.Button(self.panel_1, wx.ID_ANY, "Run")
grid_sizer_measurement.Add(self.b_run, (2, 0), (1, 1), 0, 0)
grid_sizer_5 = wx.GridBagSizer(0, 0)
grid_sizer_3.Add(grid_sizer_5, (1, 0), (1, 1), wx.EXPAND, 0)
@ -181,9 +184,10 @@ class MyFrame(wx.Frame):
self.Bind(wx.EVT_TIMER, self.update, self.timer)
self.timer.Start(100, False)
self.runner_thread = None
def on_close_window(self, event):
self.model.exit()
print("Closing")
LOGGER.info("Closing")
wx.Exit()
def update(self, event):
@ -202,12 +206,12 @@ class MyFrame(wx.Frame):
self.b_start.Enable()
def on_b_start(self, event): # wxGlade: MyFrame.<event_handler>
print("Event handler 'on_b_start'")
LOGGER.info("Event handler 'on_b_start'")
self.model.start_measuring()
event.Skip()
def on_b_save(self, event): # wxGlade: MyFrame.<event_handler>
print("Event handler 'on_b_save'")
LOGGER.info("Event handler 'on_b_save'")
with wx.FileDialog(
self,
"Save csv file",
@ -224,32 +228,39 @@ class MyFrame(wx.Frame):
event.Skip()
def on_b_run(self, event): # wxGlade: MyFrame.<event_handler>
print("Event handler 'on_b_run'")
with wx.FileDialog(self, "Open PY file", wildcard="PY files (*.py)|*.py",
style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as fileDialog:
LOGGER.info("Event handler 'on_b_run'")
with wx.FileDialog(
self,
"Open PY file",
wildcard="PY files (*.py)|*.py",
style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST,
) as fileDialog:
if fileDialog.ShowModal() == wx.ID_CANCEL:
return # the user changed their mind
if fileDialog.ShowModal() == wx.ID_CANCEL:
return # the user changed their mind
# Proceed loading the file chosen by the user
pathname = fileDialog.GetPath()
script = importlib.machinery.SourceFileLoader("ghf", pathname).load_module()
self.runner_thread = Thread(target=script.main, args=(self.model,))
self.lock()
self.runner_thread.start()
event.Skip()
# Proceed loading the file chosen by the user
pathname = fileDialog.GetPath()
script = importlib.machinery.SourceFileLoader("ghf", pathname).load_module()
self.runner_thread = Thread(target=script.main, args=(self.model,))
self.lock()
self.runner_thread.start()
event.Skip()
def on_b_stop(self, event): # wxGlade: MyFrame.<event_handler>
self.model.stop_measuring()
print("Event handler 'on_b_stop'")
LOGGER.info("Event handler 'on_b_stop'")
event.Skip()
def on_b_clear(self, event): # wxGlade: MyFrame.<event_handler>
self.model.clear()
print("Event handler 'on_b_clear'")
LOGGER.info("Event handler 'on_b_clear'")
event.Skip()
# end of class MyFrame
class MyApp(wx.App):
def OnInit(self):
self.frame = MyFrame(None, wx.ID_ANY, "")
@ -257,6 +268,7 @@ class MyApp(wx.App):
self.frame.Show()
return True
# end of class MyApp
if __name__ == "__main__":