import logging from datetime import datetime from queue import Queue from random import random from threading import Thread, Event from time import sleep from Device import Device LOGGER = logging.getLogger("my_logger") class ExampleDeviceWorker(Thread): """Communicates with the measuring hardware. Here we only produce random data.""" def __init__(self, message_queue, refresh_rate): super().__init__(name="Measure") self.refresh_rate = refresh_rate self.message_queue = message_queue self.produceData = Event() self.exit_request = Event() self.prev = 0 def run(self): """Worker method of a python Thread. Called when the Thread is started.""" while not self.exit_request.is_set(): if self.produceData.is_set(): temp = self.prev + random() / 100.0 self.prev = temp ts = datetime.utcnow() self.message_queue.put((ts, temp)) else: pass sleep(self.refresh_rate) class ExampleDeviceScalar(Device): """Communicates with the measuring hardware. Here we only produce random data.""" def __init__(self): super(Device, self).__init__() self.refresh_rate = 0.2 self.measureQueue = Queue() self.measureThread = ExampleDeviceWorker(self.measureQueue, self.refresh_rate) self.measureThread.start() self.measureThread.produceData.set() self.running = False def exit(self): self.stop_measuring() self.measureThread.exit_request.set() def set_config(self, config: dict): self.refresh_rate = config["refresh_rate"] self.measureThread.refresh_rate = self.refresh_rate def get_config(self): return {"refresh_rate": self.refresh_rate} def get_status(self): return {"running": self.running} def start_measuring(self): self.measureThread.produceData.set() LOGGER.info("I started meas") def stop_measuring(self): self.measureThread.produceData.clear() LOGGER.info("I stopped meas") @property def get_data(self): time = [] val = [] while not self.measureQueue.empty(): t, data = self.measureQueue.get() time.append(t) val.append(data) return time, val