from time import sleep from threading import Thread, Event from CoreLibrary.ExampleDriver import ExampleDriver from CoreLibrary.ExampleSensor import ExampleSensor from CoreLibrary.Measurement import Measurement from CoreLibrary.Plot_Data import Plot_Data class ExampleDriverWorker(Thread): """Communicates with the measuring hardware. Here we only produce random data.""" def __init__(self, pd): super().__init__(name="Data_Worker") self.exit_request = Event() self.pd = pd def run(self): """Worker method of a python Thread. Called when the Thread is started.""" while not self.exit_request.is_set(): self.pd.refresh() sleep(1) class Param_Model: def __init__(self): self.drivers = {"treib1": ExampleDriver()} self.sensors = {"mess1": ExampleSensor()} self.__plot_data = [] self.__measurement = [] def new_measurement(self, name=None, writer=None) -> Measurement: mess = Measurement(name=name) self.__measurement.append(mess) return mess def get_plot_data(self): plot_data = Plot_Data() self.__plot_data.append(plot_data) return plot_data def refresh(self): for name, sens in self.sensors: data = sens.get_data for meas in self.__measurement: meas.append_data(name, data) for pd in self.__plot_data: pd.append_data(name, data) def stop_measuring(self): for sens in self.sensors.items(): sens.stop_measuring() pass def start_measuring(self): pass def exit(self): pass