Newer
Older
STAging / scripts / checkPulseData.py
@Elena Graverini Elena Graverini on 3 May 2017 6 KB [scripts] Add is_central mask
from __future__ import division, print_function
import sys
import os
import string
import numpy as np
import pandas as pd
import ROOT as r

location = os.path.expandvars('$DISK/data/ST/Aging/')
macros = os.path.expandvars('$CCEHOME/macros/CCEScan/')
VERBOSE = False

# Voltage steps
voltMapTT = [None] * 11
voltMapTT[0] = 400
voltMapTT[1] = 350
voltMapTT[2] = 300
voltMapTT[3] = 250
voltMapTT[4] = 225
voltMapTT[5] = 200
voltMapTT[6] = 175
voltMapTT[7] = 150
voltMapTT[8] = 125
voltMapTT[9] = 100
voltMapTT[10] = 60
voltMapIT = [None] * 11
voltMapIT[0] = 300
voltMapIT[1] = 200
voltMapIT[2] = 170
voltMapIT[3] = 140
voltMapIT[4] = 120
voltMapIT[5] = 105
voltMapIT[6] = 90
voltMapIT[7] = 75
voltMapIT[8] = 60
voltMapIT[9] = 40
voltMapIT[10] = 20

# Time steps


def get_tmap():
    timeMap = [None] * 6
    timeMap[0] = 0.0
    timeMap[1] = -60.0
    timeMap[2] = -30.0
    timeMap[3] = 30.0
    timeMap[4] = 60.0
    timeMap[5] = 90.0
    return timeMap


def build_pulse(vals):
    times = get_tmap()
    pulse = list(zip(*sorted(zip(vals, times), key=lambda x: x[1]))[0])
    return np.array(prune(pulse))


def is_monotonic(x):
    if not len(x):
        return False
    dx = np.diff(x)
    return (np.all(dx <= 0) or np.all(dx >= 0))


def is_central(x):
    if not len(x):
        return False
    dx = np.diff(x)
    return (np.all(dx <= 0) or np.all(dx >= 0))


def max_adc(x):
    return np.max(x)


def prune(x):
    return [element for element in x if element is not None]


def get_vmap(d):
    if 'IT' in d:
        return voltMapIT
    if 'TT' in d:
        return voltMapTT


# Helpers for string formatting
# based on http://stackoverflow.com/a/23305496/3324012
class StringTemplate(object):

    class FormatDict(dict):

        def __missing__(self, key):
            return "{" + key + "}"

    def __init__(self, template):
        self.substituted_str = template
        self.formatter = string.Formatter()

    def __repr__(self):
        return self.substituted_str

    def format(self, *args, **kwargs):
        mapping = StringTemplate.FormatDict(*args, **kwargs)
        self.substituted_str = self.formatter.vformat(
            self.substituted_str, (), mapping)
        return self.substituted_str


class Table(dict):
    '''Dictionary tweaked for conversion into pandas DataFrame

       Dictionary conversion into DataFrames is faster than
       appending rows to a DataFrame, as pointed out here:
       http://stackoverflow.com/a/17496530/3324012
    '''

    def __init__(self, *keylist):
        super(dict, self).__init__()
        self.__keylist = keylist
        for key in keylist:
            self[key] = []

    def append(self, *vals):
        assert(len(vals) == len(self.__keylist))
        for i, v in enumerate(vals):
            self[self.__keylist[i]].append(v)


if __name__ == '__main__':

    if 'VERB' in sys.argv:
        VERBOSE = True

    datafile = r.TFile(location + 'CCEScan.root', 'read')

    # Select detector
    detector = 'TT'
    nstrips = [3, 5, 7]
    layer = 'TTaU'
    if 'IT' in sys.argv:
        detector = 'IT'
        nstrips = [7]
        layer = 'T3X2'

    # Load fill numbers
    with open(macros + 'Fills.dat', 'rb') as f:
        fills = f.read().splitlines()
    fills.remove('2797')
    fills.remove('3108')
    # Load sectors
    with open(macros + '{DET}sectors.dat'.format(DET=detector), 'rb') as f:
        sectors = f.read().splitlines()
    # Load inner sectors
    with open(macros + '{DET}inner.dat'.format(DET=detector), 'rb') as f:
        inners = f.read().splitlines()
    # Load voltage map
    vmap = get_vmap(detector)

    # String templates
    access = StringTemplate('{DET}/{LAY}/{SEC}/{FILL}/v_{CS}_val{NS}')
    access = access.format(DET=detector, LAY=layer)
    warning = StringTemplate(
        '{LAY}/{SEC} on fill {FILL}: pulse is monotonic for ns={NS}, Vbias={V}: \t {PULSE}')
    warning = warning.format(LAY=layer)
    not_found = StringTemplate(
        '{LAY}/{SEC} on fill {FILL}: calibration step {CS} not found for ns={NS}')
    not_found = not_found.format(LAY=layer)

    # Init dictionary for data frame
    table = Table('Detector', 'Sector', 'Fill', 'N strips', 'V bias', 'MPVs')

    # Loop on data
    for fill in fills:
        for sector in sectors:
            for ns in nstrips:
                # Loop on voltages
                for vstep in range(11):
                    pulse = []
                    for cstep in range(6 * vstep, 6 * vstep + 6):
                        try:
                            data = datafile.Get(access.format(NS=ns,
                                                              FILL=fill,
                                                              SEC=sector,
                                                              CS=cstep))
                            pulse.append(list(data)[0])
                        except:
                            if VERBOSE:
                                print(not_found.format(NS=ns,
                                                       FILL=fill,
                                                       SEC=sector,
                                                       CS=cstep))
                            pulse.append(None)
                    pulse = build_pulse(pulse)
                    # Append this occurrence to the data frame dictionary
                    table.append(detector, sector, fill,
                                 ns, vmap[vstep], pulse)
                    if is_monotonic(pulse):
                        if VERBOSE:
                            pulse = [round(v, 2) for v in pulse]
                            print(warning.format(NS=ns,
                                                 FILL=fill,
                                                 SEC=sector,
                                                 V=vmap[vstep],
                                                 PULSE=pulse))

    datafile.Close()
    # Create the pandas DataFrame from the dictionary
    data_frame = pd.DataFrame.from_dict(table)
    # Select the instances where pulse data are monotonic
    monotonics = data_frame[data_frame['MPVs'].apply(is_monotonic)]

    if not os.path.isfile('monotonics.pkl'):
        import pickle
        pickle.dump(monotonics, open('monotonics.pkl', 'wb'))

    # Combine masks with the '&' operator
    # (intersection in numpy), e.g.:
    # data_frame[mask_highVbias(125) & mask_monotonics]
    def mask_highVbias(V):
        return data_frame['V bias'] > V

    def mask_lowVbias(V):
        return data_frame['V bias'] < V

    def mask_highsig(ADC):
        return data_frame['MPVs'].apply(max_adc) > ADC

    def mask_lowsig(ADC):
        return data_frame['MPVs'].apply(max_adc) < ADC

    mask_monotonics = data_frame['MPVs'].apply(is_monotonic)

    # To do:
    # - mask selecting only central sectors
    # - intersect with Michele's result and try to find cause/consequence relationships
    # - look at the actual fits and data taking conditions