from mongoengine import connect
import zmq
from datetime import datetime, timezone
from .db.RecordAdapter import RecordAdapter
import jsonpickle
import threading
import logging
[docs]class ReaderFromMongoDB(object):
"""
Class that read data from MongoDB and sends it as if it were coming from a DAQ card.
This is a basic version, which can be extended.
Inits the MongoDB connection and the zeromq socket. This needs to start before any analysis.
"""
def __init__(self, logger=None):
if logger is None:
logger = logging.getLogger()
self.logger = logger
# connect to MongoDB
connect('muonic', host='localhost', port=27017,
username="root", password="muonic", authentication_source='admin')
# setup zmq socket for the server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind("tcp://*:1234")
[docs] def run(self):
"""
Get data from a certain timeframe from the db and then sends it through the socket.
"""
# Set up the MongoDB aggregation pipeline for time based filtering
pipeline = [
{
'$match': {
'timestamp': {
'$gt': datetime(2020, 10, 28, 0, 0, 0, tzinfo=timezone.utc),
'$lt': datetime(2020, 10, 30, 0, 0, 0, tzinfo=timezone.utc)
}
}
}
]
objs = RecordAdapter.objects().aggregate(pipeline)
for o in list(objs):
# convert objects retrieved from the db into Record objects and send them via zmq
rec = RecordAdapter(**o)
self.socket.send_string(jsonpickle.encode(rec.createRecord()))
[docs] def setup_channel(self, ch0, ch1, ch2, ch3, coincidence):
"""
Fake function. Just for API compatibility
"""
self.logger.debug(
f"Setting up fake channel: {ch0, ch1, ch2, ch3, coincidence}")
[docs] def set_threashold(self, ch0, ch1, ch2, ch3):
"""
Fake function. Just for API compatibility
"""
self.logger.debug(f"setting fake threshold: {ch0, ch1, ch2, ch3}")
[docs] def setRunning(self, state):
"""
Fake function. Just for API compatibility
"""
self.logger.debug(f"setting fake running: {state}")
[docs] def reset_scalars(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("resettig fake scalars")
[docs] def start_reading_data(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("starting to send data")
x = threading.Thread(target=self.run)
x.setDaemon(True)
x.start()
[docs] def read_scalars(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("reading fake scalars")
[docs] def do(self, arg):
"""
Fake function. Just for API compatibility
"""
self.logger.debug(f"Doing fake {arg}")
[docs] def get_temp_and_pressure(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("getting fake temp and pressure")
[docs] def clear_queues(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("clearing non existing queues")
[docs] def stop_reading_data(self):
"""
Fake function. Just for API compatibility
"""
self.logger.debug("stopping reading fake data")