import numpy as np from datetime import datetime from threading import Thread, Event from time import sleep, time from queue import Queue from random import random class SensorWorker(Thread): """ Communicates with the measuring hardware. Here we only produce random data. """ def __init__(self, message_queue): super().__init__(name="Measure") self.message_queue = message_queue self.produceData = Event() self.exit_request = Event() def run(self): """ Worker method of a python Thread. Called when the Thread is started. """ while not self.exit_request.is_set(): if self.produceData.is_set(): temp = random() ts = datetime.utcnow() self.message_queue.put((ts, temp)) else: pass sleep(1) class Sensor(object): """docstring for Model.""" def __init__(self): super(Sensor, self).__init__() self.measureQueue = Queue() self.measureThread = SensorWorker(self.measureQueue) self.measureThread.start() self.measureThread.produceData.set() def start_measuring(self): self.measureThread.produceData.set() print("I started meas") def stop_measuring(self): self.measureThread.produceData.clear() print("I stopped meas") def exit(self): self.stop_measuring() self.measureThread.exit_request.set() @property def get_data(self): time = [] val = [] while not self.measureQueue.empty(): t, data = self.measureQueue.get() time.append(t) val.append(data) return time, val # def clear(self): # self.data = [] # self.t = [] # print("I cleared meas") # # def save_measuring(self, path="temp_temp.csv"): # print("I saved meas") # np.savetxt(path, np.array(self.data))