OlliTut/measurement.py
2021-06-27 15:20:44 +02:00

72 lines
2.0 KiB
Python

import numpy as np
from datetime import datetime
from threading import Thread, Event
from time import sleep, time
from queue import Queue
from random import random
class MeasureWork(Thread):
""" Communicates with the measuring hardware. Here we only produce random data. """
def __init__(self, message_queue):
super().__init__(name="Measure")
self.message_queue = message_queue
self.produceData = Event()
self.exit_request = Event()
self.file = open("temp.csv", "w")
def run(self):
""" Worker method of a python Thread. Called when the Thread is started. """
while not self.exit_request.is_set():
if self.produceData.is_set():
temp = random()
ts = datetime.now()
self.message_queue.put((ts, temp))
self.file.write(f"{ts} - {temp}\n")
else:
self.file.flush()
sleep(1)
class Measurement(object):
"""docstring for Model."""
def __init__(self):
super(Measurement, self).__init__()
self.data = []
self.t = []
self.measureQueue = Queue()
self.measureThread = MeasureWork(self.measureQueue)
self.measureThread.start()
def start_measuring(self):
self.measureThread.produceData.set()
print("I started meas")
def save_measuring(self, path="temp_temp.csv"):
print("I saved meas")
np.savetxt(path, np.array(self.data))
def stop_measuring(self):
self.measureThread.produceData.clear()
print("I stopped meas")
def clear(self):
self.data = []
self.t = []
print("I cleared meas")
def exit(self):
self.stop_measuring()
self.measureThread.exit_request.set()
@property
def get_data(self):
while not self.measureQueue.empty():
t, data = self.measureQueue.get()
self.data.append(data)
self.t.append(t)
print(self.data)
return self.t, self.data