Newer
Older
STAging / scripts / checkPulseData.py
@Elena Graverini Elena Graverini on 25 Apr 2017 6 KB [scripts] Add TODO list
from __future__ import division, print_function
import sys
import os
import string
import numpy as np
import pandas as pd
import ROOT as r

location = os.path.expandvars('$DISK/data/ST/Aging/')
macros = os.path.expandvars('$CCEHOME/macros/CCEScan/')
VERBOSE = False

# Voltage steps
voltMapTT = [None] * 11
voltMapTT[0]    =  400
voltMapTT[1]    =  350
voltMapTT[2]    =  300
voltMapTT[3]    =  250
voltMapTT[4]    =  225
voltMapTT[5]    =  200
voltMapTT[6]    =  175
voltMapTT[7]    =  150
voltMapTT[8]    =  125
voltMapTT[9]    =  100
voltMapTT[10]   =   60
voltMapIT = [None] * 11
voltMapIT[0]    =  300
voltMapIT[1]    =  200
voltMapIT[2]    =  170
voltMapIT[3]    =  140
voltMapIT[4]    =  120
voltMapIT[5]    =  105
voltMapIT[6]    =   90
voltMapIT[7]    =   75
voltMapIT[8]    =   60
voltMapIT[9]    =   40
voltMapIT[10]   =   20

# Time steps
def get_tmap():
    timeMap = [None] * 6
    timeMap[0]      =    0.0
    timeMap[1]      =  -60.0
    timeMap[2]      =  -30.0
    timeMap[3]      =   30.0
    timeMap[4]      =   60.0
    timeMap[5]      =   90.0
    return timeMap


def build_pulse(vals):
    times = get_tmap()
    pulse = list(zip(*sorted(zip(vals, times), key=lambda x: x[1]))[0])
    return np.array(prune(pulse))


def is_monotonic(x):
    if not len(x):
        return False
    dx = np.diff(x)
    return (np.all(dx <= 0) or np.all(dx >= 0))


def max_adc(x):
    return np.max(x)


def prune(x):
    return [element for element in x if element is not None]


def get_vmap(d):
    if 'IT' in d:
        return voltMapIT
    if 'TT' in d:
        return voltMapTT


# Helpers for string formatting
# based on http://stackoverflow.com/a/23305496/3324012
class StringTemplate(object):
    class FormatDict(dict):
        def __missing__(self, key):
            return "{" + key + "}"

    def __init__(self, template):
        self.substituted_str = template
        self.formatter = string.Formatter()

    def __repr__(self):
        return self.substituted_str

    def format(self, *args, **kwargs):
        mapping = StringTemplate.FormatDict(*args, **kwargs)
        self.substituted_str = self.formatter.vformat(self.substituted_str, (), mapping)
        return self.substituted_str


class Table(dict):
    '''Dictionary tweaked for conversion into pandas DataFrame
       
       Dictionary conversion into DataFrames is faster than 
       appending rows to a DataFrame, as pointed out here:
       http://stackoverflow.com/a/17496530/3324012
    '''

    def __init__(self, *keylist):
        super(dict, self).__init__()
        self.__keylist = keylist
        for key in keylist:
            self[key] = []

    def append(self, *vals):
        assert(len(vals) == len(self.__keylist))
        for i, v in enumerate(vals):
            self[self.__keylist[i]].append(v)


if __name__ == '__main__':

    if 'VERB' in sys.argv:
        VERBOSE = True

    datafile = r.TFile(location + 'CCEScan.root', 'read')

    # Select detector    
    detector = 'TT'
    nstrips = [3, 5, 7]
    layer = 'TTaU'
    if 'IT' in sys.argv:
        detector = 'IT'
        nstrips = [7]
        layer = 'T3X2'

    
    # Load fill numbers
    with open(macros + 'Fills.dat', 'rb') as f:
        fills = f.read().splitlines()
    fills.remove('2797')
    fills.remove('3108')
    # Load sectors
    with open(macros + '{DET}sectors.dat'.format(DET=detector), 'rb') as f:
        sectors = f.read().splitlines()
    # Load voltage map
    vmap = get_vmap(detector)
    
    # String templates
    access = StringTemplate('{DET}/{LAY}/{SEC}/{FILL}/v_{CS}_val{NS}')
    access = access.format(DET=detector, LAY=layer)
    warning = StringTemplate('{LAY}/{SEC} on fill {FILL}: pulse is monotonic for ns={NS}, Vbias={V}: \t {PULSE}')
    warning = warning.format(LAY=layer)
    not_found = StringTemplate('{LAY}/{SEC} on fill {FILL}: calibration step {CS} not found for ns={NS}')
    not_found = not_found.format(LAY=layer)

    # Init dictionary for data frame
    table = Table('Detector', 'Sector', 'Fill', 'N strips', 'V bias', 'MPVs')

    # Loop on data
    for fill in fills:
        for sector in sectors:
            for ns in nstrips:
                # Loop on voltages
                for vstep in range(11):
                    pulse = []
                    for cstep in range(6*vstep, 6*vstep + 6):
                        try:
                            data = datafile.Get(access.format(NS=ns,
                                                              FILL=fill,
                                                              SEC=sector,
                                                              CS=cstep))
                            pulse.append(list(data)[0])
                        except:
                            if VERBOSE:
                                print(not_found.format(NS=ns,
                                                       FILL=fill,
                                                       SEC=sector,
                                                       CS=cstep))
                            pulse.append(None)
                    pulse = build_pulse(pulse)
                    # Append this occurrence to the data frame dictionary
                    table.append(detector, sector, fill, ns, vmap[vstep], pulse)
                    if is_monotonic(pulse):
                        if VERBOSE:
                            pulse = [round(v,2) for v in pulse]
                            print(warning.format(NS=ns,
                                                 FILL=fill,
                                                 SEC=sector,
                                                 V=vmap[vstep],
                                                 PULSE=pulse))

    datafile.Close()
    # Create the pandas DataFrame from the dictionary
    data_frame = pd.DataFrame.from_dict(table)
    # Select the instances where pulse data are monotonic
    monotonics = data_frame[data_frame['MPVs'].apply(is_monotonic)]

    if not os.path.isfile('monotonics.pkl'):
        import pickle
        pickle.dump(monotonics, open('monotonics.pkl', 'wb'))

    # Combine masks with the '&' operator
    # (intersection in numpy), e.g.:
    # data_frame[mask_highVbias(125) & mask_monotonics]
    def mask_highVbias(V):
        return data_frame['V bias'] > V

    def mask_lowVbias(V):
        return data_frame['V bias'] < V

    def mask_highsig(ADC):
        return data_frame['MPVs'].apply(max_adc) > ADC

    def mask_lowsig(ADC):
        return data_frame['MPVs'].apply(max_adc) < ADC

    mask_monotonics = data_frame['MPVs'].apply(is_monotonic)

    # To do:
    # - mask selecting only central sectors
    # - intersect with Michele's result and try to find cause/consequence relationships
    # - look at the actual fits and data taking conditions