import logging
import serial
from .getDevice import get_Device
from time import sleep
[docs]class DAQConnection(object):
"""
DAQ Connection class.
Raises SystemError if serial connection cannot be established.
:param logger: logger object
:type logger: logging.Logger
:param in_queue: input queue
:param out_queue: output queue
:raises: SystemError
"""
def __init__(self, in_queue, out_queue, logger=None):
if logger is None:
logger = logging.getLogger()
self.logger = logger
self.running = 1
self.in_queue = in_queue
self.out_queue = out_queue
try:
self.serial_port = self.get_serial_port()
except serial.SerialException as e:
self.logger.fatal(f"SerialException! Error: {e.message}")
raise SystemError(e)
[docs] def get_serial_port(self):
"""
Check out which device (/dev/tty) is used for DAQ communication.
Raises OSError if binary 'which_tty_daq' cannot be found.
:returns: serial.Serial -- serial connection port
:raises: OSError
"""
connected = False
serial_port = None
while not connected:
dev = "/dev/" + get_Device()
self.logger.info(f"DAQ Card found at {dev}")
self.logger.info("Trying to connect")
try:
serial_port = serial.Serial(port=dev, baudrate=115200,
bytesize=8, parity='N', stopbits=1,
timeout=0.5, xonxoff=True)
connected = True
except serial.SerialException as e:
self.logger.error(e)
self.logger.error("Retrying in 5 seconds")
sleep(5)
self.logger.info("Successfully connected to DAQ card")
return serial_port
[docs] def read(self):
"""
Gets Data from the DAQ card. Read it from the provided queue.
:returns: None
"""
min_sleep_time = 0.01 # seconds
max_sleep_time = 0.2 # seconds
sleep_time = min_sleep_time # seconds
while self.running:
try:
if self.serial_port.inWaiting():
while self.serial_port.inWaiting():
self.out_queue.put(self.serial_port.readline().strip())
sleep_time = max(sleep_time / 2, min_sleep_time)
else:
sleep_time = min(1.5 * sleep_time, max_sleep_time)
sleep(sleep_time)
except (IOError, OSError):
self.logger.error("IOError")
self.serial_port.close()
self.serial_port = self.get_serial_port()
[docs] def write(self):
"""
Writes messages from the in queue to the DAQ card
:returns: None
"""
while self.running:
try:
while self.in_queue.qsize():
try:
out_str = str(self.in_queue.get(0)) + str("\r")
self.serial_port.write(out_str.encode("ascii"))
except (queue.Empty, serial.SerialTimeoutException):
pass
except NotImplementedError:
self.logger.debug("Running macOS version of muonic")
while True:
try:
out_str = str(self.in_queue.get(
timeout=0.01)) + str("\r")
self.serial_port.write(out_str.encode("ascii"))
except (queue.Empty, serial.SerialTimeoutException):
pass
sleep(0.1)