OlliTut/sensor.py

72 lines
2.0 KiB
Python

import numpy as np
from datetime import datetime
from threading import Thread, Event
from time import sleep, time
from queue import Queue
from random import random
class SensorWorker(Thread):
""" Communicates with the measuring hardware. Here we only produce random data. """
def __init__(self, message_queue):
super().__init__(name="Measure")
self.message_queue = message_queue
self.produceData = Event()
self.exit_request = Event()
self.file = open("temp.csv", "w")
def run(self):
""" Worker method of a python Thread. Called when the Thread is started. """
while not self.exit_request.is_set():
if self.produceData.is_set():
temp = random()
ts = datetime.now()
self.message_queue.put((ts, temp))
self.file.write(f"{ts} - {temp}\n")
else:
self.file.flush()
sleep(1)
class Sensor(object):
"""docstring for Model."""
def __init__(self):
super(Sensor, self).__init__()
self.data = []
self.t = []
self.measureQueue = Queue()
self.measureThread = SensorWorker(self.measureQueue)
self.measureThread.start()
def start_measuring(self):
self.measureThread.produceData.set()
print("I started meas")
def save_measuring(self, path="temp_temp.csv"):
print("I saved meas")
np.savetxt(path, np.array(self.data))
def stop_measuring(self):
self.measureThread.produceData.clear()
print("I stopped meas")
def clear(self):
self.data = []
self.t = []
print("I cleared meas")
def exit(self):
self.stop_measuring()
self.measureThread.exit_request.set()
@property
def get_data(self):
while not self.measureQueue.empty():
t, data = self.measureQueue.get()
self.data.append(data)
self.t.append(t)
print(self.data)
return self.t, self.data