OlliTut/CoreLibrary/ExampleDeviceScalar.py

80 lines
2.3 KiB
Python

import logging
from datetime import datetime
from queue import Queue
from random import random
from threading import Thread, Event
from time import sleep
from Device import Device
LOGGER = logging.getLogger("my_logger")
class ExampleDeviceWorker(Thread):
"""Communicates with the measuring hardware. Here we only produce random data."""
def __init__(self, message_queue, refresh_rate):
super().__init__(name="Measure")
self.refresh_rate = refresh_rate
self.message_queue = message_queue
self.produceData = Event()
self.exit_request = Event()
self.prev = 0
def run(self):
"""Worker method of a python Thread. Called when the Thread is started."""
while not self.exit_request.is_set():
if self.produceData.is_set():
temp = self.prev + random() / 100.0
self.prev = temp
ts = datetime.utcnow()
self.message_queue.put((ts, temp))
else:
pass
sleep(self.refresh_rate)
class ExampleDeviceScalar(Device):
"""Communicates with the measuring hardware. Here we only produce random data."""
def __init__(self):
super(Device, self).__init__()
self.refresh_rate = 0.2
self.measureQueue = Queue()
self.measureThread = ExampleDeviceWorker(self.measureQueue, self.refresh_rate)
self.measureThread.start()
self.measureThread.produceData.set()
self.running = False
def exit(self):
self.stop_measuring()
self.measureThread.exit_request.set()
def set_config(self, config: dict):
self.refresh_rate = config["refresh_rate"]
self.measureThread.refresh_rate = self.refresh_rate
def get_config(self):
return {"refresh_rate": self.refresh_rate}
def get_status(self):
return {"running": self.running}
def start_measuring(self):
self.measureThread.produceData.set()
LOGGER.info("I started meas")
def stop_measuring(self):
self.measureThread.produceData.clear()
LOGGER.info("I stopped meas")
@property
def get_data(self):
time = []
val = []
while not self.measureQueue.empty():
t, data = self.measureQueue.get()
time.append(t)
val.append(data)
return time, val