from abc import abstractmethod from threading import Thread, Event from time import sleep from queue import Queue from CoreLibrary.Driver import Driver import logging LOGGER = logging.getLogger("my_logger") class ExampleDriverWorker(Thread): """Communicates with the measuring hardware. Here we only produce random data.""" def __init__(self, message_queue): super().__init__(name="Driver") self.produceData = Event() self.exit_request = Event() def run(self): """Worker method of a python Thread. Called when the Thread is started.""" while not self.exit_request.is_set(): sleep(1) class ExampleDriver(Driver): """docstring for Model.""" def __init__(self): super(Driver, self).__init__() self.measureQueue = Queue() self.measureThread = ExampleDriverWorker(self.measureQueue) self.measureThread.start() self.measureThread.produceData.set() self.speed = 10 self.running = False def exit(self): self.measureThread.exit_request.set() def set_config(self, config: dict): self.speed = config["speed"] def get_status(self): return {"running": self.running} def get_config(self): pass