213 lines
5.5 KiB
Python
213 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# from driver.Luci10 import Luci10
|
|
import logging
|
|
from numpy import floor
|
|
from time import sleep
|
|
from threading import Thread, Event
|
|
from sensor import Sensor
|
|
from driver import Driver
|
|
|
|
LOGGER = logging.getLogger()
|
|
|
|
|
|
class FemtoDLPVA100FWorker(Thread):
|
|
def __init__(
|
|
self,
|
|
name="FemtoDLPVA100F Worker",
|
|
index=1,
|
|
delay=0.2,
|
|
):
|
|
super().__init__()
|
|
self.exit_request = Event()
|
|
self.dc = Event()
|
|
self.low_noise = Event()
|
|
self.lsbA = Event()
|
|
self.msbA = Event()
|
|
self.lsbB = Event()
|
|
self.msbB = Event()
|
|
|
|
self.overload_a = False
|
|
self.overload_b = False
|
|
|
|
self.name = name
|
|
# self.luci = Luci10(printer=False)
|
|
self.index = index
|
|
self.delay = delay
|
|
|
|
def run(self):
|
|
while not self.exit_request.is_set():
|
|
|
|
byte_a, byte_b = 0, 0
|
|
if self.lsbA.is_set():
|
|
byte_a |= 0b00000010 # Set 2nd bit to 1
|
|
if self.msbA.is_set():
|
|
byte_a += 4
|
|
if self.lsbB.is_set():
|
|
byte_b += 2
|
|
if self.msbB.is_set():
|
|
byte_b += 4
|
|
if self.dc.is_set():
|
|
byte_a += 8
|
|
byte_b += 8
|
|
if self.low_noise.is_set():
|
|
byte_a += 16
|
|
byte_b += 16
|
|
|
|
# self.luci.led_on()
|
|
# self.luci.write_bytes(self.index, byte_a, byte_b)
|
|
# self.luci.led_off()
|
|
|
|
LOGGER.debug(f"({self.name}) write bytes A, B: {byte_a:b}, {byte_b:b}")
|
|
|
|
sleep(self.delay)
|
|
|
|
|
|
class FemtoDLPVA100F(Driver):
|
|
def __init__(self, name="FemtoDLPVA100F", index=1):
|
|
|
|
self.name = name
|
|
# self.luci = Luci10(printer=False)
|
|
self.index = index
|
|
|
|
self.overload_A = False
|
|
self.overload_B = False
|
|
self.check_overload()
|
|
|
|
self.femtoThread = FemtoDLPVA100FWorker()
|
|
self.femtoThread.dc.set()
|
|
self.femtoThread.low_noise.set()
|
|
self.femtoThread.lsbA.clear()
|
|
self.femtoThread.msbA.clear()
|
|
self.femtoThread.lsbB.clear()
|
|
self.femtoThread.msbB.clear()
|
|
|
|
self.femtoThread.start()
|
|
|
|
self.dc = True
|
|
self.low_noise = True
|
|
self.exp_a = 1
|
|
self.exp_b = 1
|
|
|
|
def check_overload(self):
|
|
overload_A = self.check_overload_a()
|
|
overload_B = self.check_overload_b()
|
|
return overload_A, overload_B
|
|
|
|
def get_status(self):
|
|
a, b = self.check_overload()
|
|
return {"overload_a": a, "overload_b": b}
|
|
|
|
def check_overload_a(self):
|
|
return False
|
|
# self.luci.led_on()
|
|
# self.overload_A = not self.luci.get_status_pin5(self.index)
|
|
# self.luci.led_off()
|
|
LOGGER.info(f"({self.name:s}) overload A: {self.overload_A}")
|
|
return self.overload_A
|
|
|
|
def check_overload_b(self):
|
|
return True
|
|
# self.luci.led_on()
|
|
# self.overload_B = not self.luci.get_status_pin6(self.index)
|
|
# self.luci.led_off()
|
|
LOGGER.info(f"({self.name:s}) overload B: {self.overload_B}")
|
|
return self.overload_B
|
|
|
|
def exit(self):
|
|
self.femtoThread.exit_request.set()
|
|
LOGGER.info("(%s) Thread closed." % self.name)
|
|
|
|
def get_config(self):
|
|
config = {
|
|
"dc": self.dc,
|
|
"low_noise": self.low_noise,
|
|
"exp_a": self.exp_a,
|
|
"exp_b": self.exp_b,
|
|
}
|
|
LOGGER.info("(%s) config: %s" % (self.name, config))
|
|
|
|
return config
|
|
|
|
def set_config(self, config):
|
|
# TODO proper dict parsing
|
|
exp_a = 1
|
|
exp_b = 1
|
|
dc = True
|
|
low_noise = True
|
|
self.dc = dc
|
|
if dc:
|
|
self.set_dc()
|
|
else:
|
|
self.reset_dc()
|
|
# TODO reduce spam
|
|
if low_noise:
|
|
self.set_low_noise()
|
|
self.low_noise = True
|
|
else:
|
|
self.reset_low_noise()
|
|
self.low_noise = False
|
|
|
|
self.set_exponent_a(exp_a=exp_a)
|
|
self.exp_a = exp_a
|
|
self.set_exponent_b(exp_b=exp_b)
|
|
self.exp_b = exp_b
|
|
|
|
def set_dc(self):
|
|
self.femtoThread.dc.set()
|
|
LOGGER.info("(%s) dc is set to %s" % (self.name, True))
|
|
|
|
def reset_dc(self):
|
|
self.femtoThread.dc.clear()
|
|
LOGGER.info("(%s) dc is set to %s" % (self.name, False))
|
|
|
|
def set_low_noise(self):
|
|
self.femtoThread.low_noise.set()
|
|
LOGGER.info("(%s) low noise is set to %s" % (self.name, True))
|
|
|
|
def reset_low_noise(self):
|
|
self.femtoThread.low_noise.clear()
|
|
LOGGER.info("(%s) low noise is set to %s" % (self.name, False))
|
|
|
|
def set_exponent_a(
|
|
self,
|
|
exp_a=1,
|
|
):
|
|
if exp_a < 1:
|
|
exp_a = 1
|
|
if exp_a > 4:
|
|
exp_a = 4
|
|
|
|
if (exp_a % 2) == 0:
|
|
self.femtoThread.lsbA.set()
|
|
else:
|
|
self.femtoThread.lsbA.clear()
|
|
|
|
if (floor((exp_a - 1) / 2) - 1) == 0:
|
|
self.femtoThread.msbA.set()
|
|
else:
|
|
self.femtoThread.msbA.clear()
|
|
|
|
LOGGER.debug("(%s) exponent a = %i" % (self.name, exp_a))
|
|
|
|
def set_exponent_b(
|
|
self,
|
|
exp_b=1,
|
|
):
|
|
if exp_b < 1:
|
|
exp_b = 1
|
|
if exp_b > 4:
|
|
exp_b = 4
|
|
|
|
if (exp_b % 2) == 0:
|
|
self.femtoThread.lsbB.set()
|
|
else:
|
|
self.femtoThread.lsbB.clear()
|
|
|
|
if (floor((exp_b - 1) / 2) - 1) == 0:
|
|
self.femtoThread.msbB.set()
|
|
else:
|
|
self.femtoThread.msbB.clear()
|
|
|
|
LOGGER.debug("(%s) exponent a = %i" % (self.name, exp_b))
|