Source code for rewrite.lib.daq.Provider

import logging
import re
import multiprocessing as mp
import queue
from .Connection import DAQConnection

from .Exceptions import DAQIOError, DAQMissingDependencyError


[docs]class DAQProvider(object): """ Class providing the public API and helpers for the communication with the DAQ card """ LINE_PATTERN = re.compile("^[a-zA-Z0-9+-.,:()=$/#?!%_@*|~' ]*[\n\r]*$") def __init__(self, logger=None): if logger is None: logger = logging.getLogger() self.logger = logger self.out_queue = mp.Queue() self.in_queue = mp.Queue() self.daq = DAQConnection(self.in_queue, self.out_queue, self.logger) # Set up the thread to do asynchronous I/O. More can be made if # necessary. Set daemon flag so that the threads finish when the main # app finishes self.read_thread = mp.Process(target=self.daq.read, name="pREADER") self.read_thread.daemon = True self.read_thread.start() self.write_thread = mp.Process(target=self.daq.write, name="pWRITER") self.write_thread.daemon = True self.write_thread.start()
[docs] def get(self, *args): """ Get data from the DAQ card. Raises DAQIOError if the queue is empty. :param args: queue arguments :type args: list :returns: str or None -- next item from the queue :raises: DAQIOError """ try: line = self.out_queue.get(*args) except queue.Empty: raise DAQIOError("Queue is empty") return self.validate_line(line)
[docs] def put(self, *args): """ Senf data to the DAQ card. :param args: queue arguments :type args: list :returns: None """ self.in_queue.put(*args)
[docs] def data_available(self): """ Tests if data is available from the DAQ card. :returns: int or bool """ try: size = self.out_queue.qsize() except NotImplementedError: self.logger.debug("Running macOS version of muonic") size = not self.out_queue.empty() return size
[docs] def validate_line(self, line): """ Validate line againt regex pattern. Returns None if the provided line is invalid or the line if it is valid. :param line: line to validate :type line: str :returns: str or None """ if self.LINE_PATTERN.match(line.decode("ascii")) is None: line = line.rstrip('\r\n') self.logger.warning(f"Got invalid data from the DAQ card: {line}") return None return line