Source code for rewrite.lib.daq.DAQServer

import logging


from .Provider import DAQProvider
from datetime import datetime
from time import time, sleep
from multiprocessing import Queue
import threading
from ..common.Record import Record, RecordType
from ..common.CountRecord import CountRecord
from ..common.TemperatureRecord import TemperatureRecord
from ..common.PressureRecord import PressureRecord, PressureType
from ..common.DataRecord import DataRecord
from ..common.GPSRecord import GPSRecord
import zmq
import jsonpickle


[docs]class DAQServer(object): def __init__(self): # process incoming data self.countqueue = Queue() self.tempqueue = Queue() self.pressqueue = Queue() self.dataqueue = Queue() # setup zmq socket for the server self.context = zmq.Context() self.socket = self.context.socket(zmq.PUB) self.socket.bind("tcp://*:1234") # package number self.package_number = 0 self.running = False # maximum number of the scalars on the DAQ card self.max_counts = int("FFFFFFFF", 16) # setup the logger self.logger = logging.getLogger() self.logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(levelname)s:%(process)d:%(module)s:%(funcName)s:%(lineno)d:%(message)s') ch.setFormatter(formatter) self.logger.addHandler(ch) # Connect to the DAQ card self.client = DAQProvider(logger=self.logger) # disable data flow for startup self.stop_reading_data() # disable status messages from the card self.do('ST 0')
[docs] def do(self, msg): """ Send a command to the DAQ card and remove repeated responses from the outqueue if data taking is turned off. Otherwise just send the command to the card. """ if self.running == False: self.client.put(msg) sleep(0.5) self.client.get(0) if msg == 'ST 0' or msg.startswith('WC'): self.client.get(0) else: self.client.put(msg)
[docs] def reset_scalars(self): """ Reset the scalars of all channels. """ self.do('RB') self.logger.debug('Resetted scalars.')
[docs] def start_reading_data(self): """ Start receiving data from the DAQ card and storing it in self.dataqueue. """ self.logger.debug('Start reading data into queues.') self.do('CE') x = threading.Thread(target=self.process_incoming) x.start()
# self.process_incoming()
[docs] def stop_reading_data(self): """ Stop receiving data from the DAQ card. """ self.do('CD') while self.client.data_available(): self.client.get(0) self.logger.debug('Stopped reading data.')
[docs] def read_scalars(self): """ Read the scalars of all channels. If no measurement is running, returns scalar values: ch0, ch1, ch2, ch3, trigger """ if self.running: self.client.put('DS') else: self.do('DS') scalar_msg = self.client.get(0) if scalar_msg.startswith('DS'): self.get_scalars(scalar_msg) return self.counts_ch0, self.counts_ch1, self.counts_ch2, self.counts_ch3, self.counts_trigger else: self.logger.info( "Didn't find scalars in message. %s" % scalar_msg)
[docs] def set_threashold(self, th_0=300, th_1=300, th_2=300, th_3=300): """ Set the threasholds for the channels of the DAQ card. Default value for all channels is 300. """ Th = [th_0, th_1, th_2, th_3] for i in range(4): msg = "TL "+str(i)+" "+str(Th[i]) self.do(msg) self.do('TL') scan = 0 while scan < 10: response_th = self.client.get(0).decode("ascii") if response_th.startswith('TL') and len(response_th) > 9: self.logger.info("Thresholds set to %s" % response_th) break else: self.logger.debug("Haven't found threasholds yet") self.logger.debug(response_th) scan += 1 else: self.logger.error( "Didn't find threashold setting in last 10 messanges coming from DAQ card. Something is wrong!")
[docs] def setup_channel(self, ch0=False, ch1=False, ch2=False, ch3=False, coincidence='single'): """ Enable/Disable channels of the DAQ card and set coincidence settings. """ self.nchannels = 0 channels = [ch3, ch2, ch1, ch0] ch_setting = str("") for i in range(4): if channels[i] == False: ch_setting += '0' elif channels[i] == True: ch_setting += '1' self.nchannels += 1 ch = format(int(ch_setting, 2), 'X') if coincidence == 'single': coinc = 0 elif coincidence == 'twofold': coinc = 1 elif coincidence == 'threefold': coinc = 2 elif coincidence == 'fourfold': coinc = 3 msg = "WC 00 "+str(coinc)+str(ch) self.do(msg) sleep(0.5) self.do('DC') response_ch = self.client.get(0) self.logger.info("Channels set. %s" % response_ch)
[docs] def get_temp_and_pressure(self): """ Read out temperature and pressure data. Pressure data in unit counts and mBar. If no measurement is running returns temperature, pressure, pressure_mbar """ self.temperature = -999.0 self.pressure = -999.0 self.pressure_mbar = -999.0 if self.running: # get temperature from temperature queue if self.tempqueue.qsize(): msg_temp = self.tempqueue.get(0) tmpRec = TemperatureRecord(msg_temp) rec = Record(self.package_number, RecordType.TEMPERATURE, datetime.now().timestamp(), tmpRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) self.temperature = float(msg_temp.split("=")[1]) else: self.logger.info( "Failed to measure the temperature. No element in queue.") # get pressure from pressure queue if self.pressqueue.qsize(): msg_press = self.pressqueue.get(0) msg_press_mbar = self.pressqueue.get(0) self.check_pressure_msg(msg_press) self.check_pressure_msg(msg_press_mbar) else: self.logger.info( "Failed to measure the pressure. No element in queue.") else: self.do('TH') msg_temp = self.client.get(0).decode("ascii") tmpRec = TemperatureRecord(msg_temp) rec = Record(self.package_number, RecordType.TEMPERATURE, datetime.now().timestamp(), tmpRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) print(f"type: {type(msg_temp)}") if msg_temp.startswith('TH'): self.temperature = float(msg_temp.split("=")[1]) self.logger.debug('Measured temperature: %f' % self.temperature) else: self.logger.error("Could not read temperature.") self.do('BA') msg_press = self.client.get(0).decode("ascii") presRec = PressureRecord(msg_press) rec = Record(self.package_number, RecordType.PRESSURE, datetime.now().timestamp(), presRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) if msg_press.startswith('BA'): self.pressure = float(msg_press.split()[1]) self.logger.debug('Measured pressure: %f' % self.pressure) else: self.logger.error("Could not read pressure [counts].") # take out the 'calibrate pressure' message: self.client.get(0) calib_press = self.client.get(0).decode("ascii") if calib_press.startswith('mBar'): self.pressure_mbar = float(calib_press.split()[4]) self.logger.debug('Measured pressure in mBar: %f' % self.pressure_mbar) else: self.logger.error("Could not read pressure [mBar].") return self.temperature, self.pressure, self.pressure_mbar
[docs] def check_pressure_msg(self, msg): """ Check message for pressure information. """ presRec = PressureRecord(msg) rec = Record(self.package_number, RecordType.PRESSURE, datetime.now().timestamp(), presRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) if msg.startswith('BA'): self.pressure = float(msg.split()[1]) elif msg.startswith('mBar'): self.pressure_mbar = float(msg.split()[4]) else: self.logger.info( "Weird element in pressure queue. Could not read pressure.")
[docs] def get_gps_info(self): self.do('DG') Found = False # print("before get") msg = self.client.get(0) # print("after get") while not Found: if msg.startswith(b'Date+Time'): gpsmsg = msg Found = True else: msg = self.client.get(0) GPSDateTime = gpsmsg Status = self.client.get(0) PosFix = self.client.get(0) Latitude = self.client.get(0) Longitude = self.client.get(0) Altitude = self.client.get(0) NSats = self.client.get(0) PPSDelay = self.client.get(0) FPGATime = self.client.get(0) ChkSumErr = self.client.get(0) gpsRecord = GPSRecord(GPSDateTime,Status,PosFix,Latitude,Longitude,Altitude,NSats,PPSDelay,FPGATime,ChkSumErr) rec = Record(self.package_number, RecordType.GPS, datetime.now().timestamp(), jsonpickle.encode(gpsRecord)) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) self.logger.info(GPSDateTime) self.logger.info(Status) self.logger.info(PosFix) self.logger.info(Latitude) self.logger.info(Longitude) self.logger.info(Altitude) self.logger.info(NSats) self.logger.info(PPSDelay) self.logger.info(FPGATime) self.logger.info(ChkSumErr)
[docs] def process_incoming(self): """ Sort messages received from the DAQ card and store them in separate queues. """ while self.running: while self.client.data_available(): try: msg = self.client.get(0).decode("ascii") except: self.logger.debug('Queue empty!') break # print(msg) if msg.startswith('DS'): if len(msg) >= 3: cntRec = CountRecord(msg) rec = Record(self.package_number, RecordType.COUNTER, datetime.now().timestamp(), cntRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) self.countqueue.put(msg) elif msg.startswith('TH'): if len(msg) >= 9: tmpRec = TemperatureRecord(msg) rec = Record(self.package_number, RecordType.TEMPERATURE, datetime.now().timestamp(), tmpRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) self.tempqueue.put(msg) elif msg.startswith('BA') or msg.startswith('mBar'): if len(msg) >= 4: presRec = PressureRecord(msg) rec = Record(self.package_number, RecordType.PRESSURE, datetime.now().timestamp(), presRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) self.pressqueue.put(msg) elif msg.startswith('CD') or msg.startswith('CE'): continue else: dataRec = DataRecord(msg) rec = Record(self.package_number, RecordType.DATA, datetime.now().timestamp(), dataRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) # self.dataqueue.put(msg) else: sleep(0.2)
[docs] def get_scalars(self, msg=None): """ If running=True, read out scalars from the counterqueue. Otherwise, read scalars from given message. Returns the scalar values. """ if msg != None: counter_from_msg = msg.split() else: if self.countqueue.qsize(): count_msg = self.countqueue.get(0) counter_from_msg = count_msg.split() else: self.logger.error( "Failed to get counters. No element in queue.") for item in counter_from_msg: if ("S0" in item) & (len(item) == 11): self.counts_ch0 = int(item[3:], 16) elif ("S1" in item) & (len(item) == 11): self.counts_ch1 = int(item[3:], 16) elif ("S2" in item) & (len(item) == 11): self.counts_ch2 = int(item[3:], 16) elif ("S3" in item) & (len(item) == 11): self.counts_ch3 = int(item[3:], 16) elif ("S4" in item) & (len(item) == 11): self.counts_trigger = int(item[3:], 16) elif ("S5" in item) & (len(item) == 11): counters_time = float(int(item[3:], 16)) cntRec = CountRecord(msg) rec = Record(self.package_number, RecordType.COUNTER, datetime.now().timestamp(), cntRec) self.package_number += 1 self.socket.send_string(jsonpickle.encode(rec)) return self.counts_ch0, self.counts_ch1, self.counts_ch2, self.counts_ch3, self.counts_trigger
# def calculate_rates(self): # """ # Calculate rates during rate measurements. # """ # counts_ch0_start, counts_ch1_start, counts_ch2_start, counts_ch3_start, counts_trigger_start = self.get_scalars() # counts_ch0_end, counts_ch1_end, counts_ch2_end, counts_ch3_end, counts_trigger_end = self.get_scalars() # counters_previous = [counts_ch0_start, counts_ch1_start, # counts_ch2_start, counts_ch3_start, counts_trigger_start] # counters = [counts_ch0_end, counts_ch1_end, # counts_ch2_end, counts_ch3_end, counts_trigger_end] # self.diff_counters = [] # self.rates = [] # for i in range(len(counters)): # if counters[i] >= counters_previous[i]: # self.diff_counters.append(counters[i]-counters_previous[i]) # elif counters[i] < counters_previous[i]: # self.diff_counters.append( # max_counts-counters_previous[i]+counters[i]) # self.rates.append(self.diff_counters[i]/self.delta_time)
[docs] def clear_queues(self): """ Clear all the queues filled in process_incoming(). """ for queue in [self.countqueue, self.pressqueue, self.tempqueue, self.dataqueue]: while queue.qsize(): queue.get(0) self.logger.debug('Finished clearing queues.')
[docs] def run(self): self.running = True self.x = threading.Thread(target=self.start_reading_data) self.x.start()
[docs] def stop(self): self.running = False self.x.stop()
[docs] def setRunning(self, isRunning): self.running = isRunning
[docs] def measure_pulses(self, meastime=None): """ Measure pulses (rising and falling edge times) of trigger events. Using PulseExtractor from muonic. :param meastime: Total measurement time in minutes. Default is None. """ from muonic.analysis import PulseExtractor if isinstance(meastime, float): self.logger.info( 'Starting pulse measurement. Total measurement time: %f' % meastime) if self.running: pass else: self.running = True self.starttime = datetime.now().timestamp().strftime("%Y-%m-%d_%H-%M-%S") x = threading.Thread(target=self.start_reading_data) x.start() filename_pulses = self.starttime+"_P.txt" pe = PulseExtractor(self.logger, filename_pulses) pe.write_pulses(True) t = 0 start_t = time() try: while t < (meastime*60): while self.dataqueue.qsize() != 0: try: msg = self.dataqueue.get() except: self.logger.debug('Dataqueue empty!') break pulses = pe.extract(msg) t = time()-start_t self.logger.info( 'Measurement progress: %f %%' % (100*t/(meastime*60))) self.stop_reading_data() self.logger.info('Measurement is stopping. Please wait!') sleep(5) self.running = False self.logger.info('Measurement stopped!') self.clear_queues() except (KeyboardInterrupt, SystemExit): self.stop_reading_data() self.logger.info('Measurement is stopping. Please wait!') sleep(5) self.running = False self.logger.info('Measurement stopped!') self.clear_queues() elif meastime == None: self.logger.info( 'Starting pulse measurement. No measurement time set.') if self.running: pass else: self.running = True self.starttime = datetime.now().timestamp().strftime("%Y-%m-%d_%H-%M-%S") x = threading.Thread(target=self.start_reading_data) x.start() filename_pulses = self.starttime+"_P.txt" pe = PulseExtractor(self.logger, filename_pulses) pe.write_pulses(True) try: while self.running: while self.dataqueue.qsize() != 0: try: msg = self.dataqueue.get() except: self.logger.debug('Dataqueue empty!') break pe.extract(msg) except (KeyboardInterrupt, SystemExit): self.stop_reading_data() self.logger.info('Measurement is stopping. Please wait!') sleep(5) self.running = False self.logger.info('Measurement stopped!') self.clear_queues()