import xmlrpc.client
import zmq
import logging
import jsonpickle
import numpy as np
from ..common.CountRecord import CountRecord
from ..common.Record import RecordType, Record
from ..common.PressureRecord import PressureType, PressureRecord
from ..common.TemperatureRecord import TemperatureRecord
from ..utils.Time import getCurrentTimeString
from datetime import datetime
from time import time, sleep
import threading
import queue
[docs]class RateAnalyzer():
"""
Class that manages the measurement of muon rate.
"""
def __init__(self, logger=None, headless=True):
if logger is None:
logger = logging.getLogger()
self.logger = logger
# Setup the communication with the data well
self.ctx = zmq.Context()
self.sock = self.ctx.socket(zmq.SUB)
self.sock.connect("tcp://127.0.0.1:1234")
self.sock.subscribe("") # Subscribe to all topics
self.headless = headless
if headless:
self.server = xmlrpc.client.ServerProxy("http://localhost:5556")
self.server.setup_channel(True, True, True, True, 'threefold')
self.server.set_threashold(110, 110, 180, 110)
# self.server.get_gps_info()
self.starttime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
self.filename = self.starttime+"_R.txt"
#self.file = open(self.filename, 'a')
self.prev_rates = None
self.outQueue = queue.Queue()
writerTask = threading.Thread(target=self.fileWriter).start()
self.pressure = 99
self.temperature = 20
[docs] def fileWriter(self):
with open(self.filename, 'a') as f:
while True:
item = self.outQueue.get()
f.write(item)
f.write('\n')
f.flush()
[docs] def write_rates_to_file(self, firstline=False):
"""
Saves data to file during rate measurements.
"""
if firstline:
self.logger.info(
'Starting to write data to file %s' % self.filename)
self.outQueue.put(
"Date Time Rate_0 Rate_1 Rate_2 Rate_3 Rate_trigger Counts_0 Counts_1 Counts_2 Counts_3 Trigger Delta_time Pressure [mBar] Temperature [C] \n")
[docs] def runDaemon(self):
while True:
msg = self.sock.recv_string()
obj = jsonpickle.decode(msg)
if obj.type == RecordType.COUNTER and obj.payload.valid == True:
print(
f"Package No.: {obj.packageNumber} Type: {obj.type} timestamp: {obj.timestamp} payloads: {repr(obj.payload)}")
#print(f"date: {datetime.fromtimestamp(obj.timestamp)}")
cntRec = obj.payload
if self.prev_rates is None:
self.prev_rates = np.array(
[cntRec.counts_ch0, cntRec.counts_ch1, cntRec.counts_ch2, cntRec.counts_ch3, cntRec.counts_trigger])
self.previous_time = datetime.fromtimestamp(obj.timestamp)
else:
curRates = np.array([cntRec.counts_ch0, cntRec.counts_ch1,
cntRec.counts_ch2, cntRec.counts_ch3, cntRec.counts_trigger])
current_time = datetime.fromtimestamp(obj.timestamp)
self.delta_time = (
current_time - self.previous_time).total_seconds()
self.previous_time = current_time
deltaRates = curRates - self.prev_rates
self.prev_rates = curRates
deltaRates = deltaRates / self.delta_time
if self.dateandtime is None:
self.dateandtime = datetime.now()
if not self.headless :
send = deltaRates.tolist()
send.append(self.delta_time)
print(f"deltaRates: {deltaRates}, self.delta_time: {self.delta_time}, send: {send}")
self.progress.emit(send)
self.outQueue.put(
f"{getCurrentTimeString()} {deltaRates[0]} {deltaRates[1]} {deltaRates[2]} {deltaRates[3]} {deltaRates[4]} {curRates[0]} {curRates[1]} {curRates[2]} {curRates[3]} {curRates[4]} {self.delta_time} {self.current_pressure} {self.temperature}")
elif obj.type == RecordType.PRESSURE and obj.payload.valid == True and obj.payload.pressure_type == PressureType.MBAR:
self.current_pressure = obj.payload.pressure
elif obj.type == RecordType.TEMPERATURE and obj.payload.valid == True:
self.temperature = obj.payload.temperature
#print(f"{self.dateandtime} {curRates[0]} {curRates[1]} {curRates[2]} {curRates[3]} {curRates[4]} {deltaRates[0]} {deltaRates[1]} {deltaRates[2]} {deltaRates[3]} {deltaRates[4]}")
[docs] def measure_rates(self, timewindow=5.0, meastime=None):
"""
Measure rates seen by the counters.
:param timewindow: Time between successive rate measurements in seconds. Default is 5 seconds.
:param meastime: Total measurement time in minutes. Default is None.
"""
if isinstance(meastime, float):
self.logger.info('Starting rate measurement. Rate is measured every %f seconds, total measurement time: %f min' % (
timewindow, meastime))
self.server.setRunning(True)
self.write_rates_to_file(firstline=True)
self.server.reset_scalars()
# x = threading.Thread(target=self.start_reading_data)
# x.start()
print("before reading data")
self.server.start_reading_data()
x = threading.Thread(target=self.runDaemon)
# x.start()
print("after reading data")
t = 0
try:
while t < (meastime*60):
# self.server.read_scalars()
time_start = time()
sleep(timewindow)
self.server.read_scalars()
time_end = time()
self.dateandtime = datetime.now().strftime(
"%Y-%m-%d %H:%M:%S.%f")[:-3]
self.server.do('TH')
self.server.do('BA')
self.server.get_temp_and_pressure()
sleep(0.5)
self.delta_time = time_end-time_start
# self.server.calculate_rates()
if not x.is_alive():
x.start()
self.write_rates_to_file()
if not self.headless:
self.progressbar.emit(100*t/(meastime*60) )
self.logger.info('Measurement progress: %f %%' %
(100*t/(meastime*60)))
t += self.delta_time
self.server.stop_reading_data()
self.logger.info('Measurement is stopping. Please wait!')
sleep(5)
self.running = False
self.logger.info('Measurement stopped!')
self.server.clear_queues()
self.finished.emit()
except (KeyboardInterrupt, AttributeError, RuntimeError, NameError, SystemExit):
self.server.stop_reading_data()
self.logger.info('Measurement is stopping. Please wait!')
sleep(5)
self.server.setRunning(False)
self.logger.info('Measurement stopped!')
self.server.clear_queues()
self.finished.emit()
elif meastime == None:
self.logger.info(
'Starting rate measurement. Rate is measured every %f seconds. No measurement time set.' % timewindow)
self.running = True
self.starttime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
self.write_rates_to_file(firstline=True)
self.server.reset_scalars()
# x = threading.Thread(target=self.process_incoming)
# x.start()
x = threading.Thread(target=self.server.process_incoming())
x.start()
try:
while self.running:
self.server.read_scalars()
time_start = time()
sleep(timewindow)
self.server.read_scalars()
time_end = time()
self.dateandtime = datetime.now().strftime(
"%Y-%m-%d %H:%M:%S.%f")[:-3]
self.server.do('TH')
self.server.do('BA')
sleep(0.5)
self.server.get_temp_and_pressure()
self.delta_time = time_end-time_start
self.write_rates_to_file()
except (KeyboardInterrupt, AttributeError, RuntimeError, NameError, SystemExit):
self.server.stop_reading_data()
self.logger.info('Measurement is stopping. Please wait!')
sleep(5)
self.server.setRunning(False)
self.logger.info('Measurement stopped!')
self.server.clear_queues()
self.finished.emit()
else:
self.logger.error(
'Got incorrect meastime. If you do not want to specify meastime, set it to None.')