""" Here are contained supplimentary functions for Tuple to Histogram and Pkl to Histogram transformation. Normally, Tuples are stored as a python dictionaries (see create_coll, create_monitor_ind and create_efficiency_ind) Two type of dictioanries is considered: Efficiency-like and Monitor-like These dictionaries contains histograms, which a later stored in a .root file (In format which is recognized by interactie ST monitor) Basing on time binning, trend histograms are also created. (They are also saved in ST-monitor-friendly .root files) ST map plots are created by funcitons from CreateTTHist and CreateITHist files. """ import os import inspect from pprint import pprint from itertools import product import math import numpy as n import sys from config import binning from config import bin_name from config import bin_vector from config import residual_limit from config import perform_window_eff_study from config import efficiency_windows from config import residual_nBins from config import SNratio_limit from config import SNratio_nBins from config import extra_name from datetime import datetime from drawing.Create_Maps import TT_Map as TT_Map_func from drawing.Create_Maps import IT_Map as IT_Map_func from array import array import ROOT as R from ROOT import gStyle from ROOT import gROOT gStyle.SetOptStat(False) def syntax_explanation(script): if script == "SingleTrend.py": print "Incorrect syntax. Please run with:" print "python "+script+" <pkl_data_file> <sector_name>" print "Plese use sector names like 'TTaXRegionBSector9'" elif script == "PklAlgebra.py": print "Incorrect syntax. Please run with:" print "python "+script+" <pkl_1> <pkl_2> <formula with a for ds1, b for ds2> <optinaly: variable of interest>" print "For example:" print "python "+script+" ds1.pkl ds2.pkl a+b efficiency" print "This command will create a new collection containing sum of efficiencies." print "If you will not specify variable, all variables in collection will be calculated." print "Please use .pkls with collections, which have the same time binning" print "Please use .pkls with collections, which describe the same detectors" print "Please use .pkls which correspond to the same operation mode" print "This function create a new pkl file containng dictionary based on dictionaries from inputs." print "Both input pkls should have the same structure." print "Resulting pkl will also have the same structure." print "The values of the resulting dictionary elements will be found by evaluation of a given formula, with 'a' replaced with value from ds1, and 'b' replaced with value from ds2." print "If it is impossible to evaluate a formula, the value in resulting dictionary will be set to 0 and warning will be printed." else: print "Incorrect syntax. Please run with:" print "python "+script+" <data_file> <mode>" print "<mode> here:" print "1 - TT Hit Efficiency" print "2 - IT Hit Efficiency" return True def cli_progress_test(i, end_val, start, bar_length=20): percent = float(i) / end_val hashes = '#' * int(round(percent * bar_length)) spaces = ' ' * (bar_length - len(hashes)) sys.stdout.write("\rPercent: [{0}] {1}% ({2}/{3}), {4}".format(hashes + spaces, int(round(percent * 100)),i, end_val, datetime.now()-start)) sys.stdout.flush() def run_binning(): #{<run_start>:{"run_start":<run_start>, # "run_stop":<run_stop>} #} global binning run_schema = {} for pb in binning: if ("BinByMonth" in bin_name or "AllRuns" in bin_name or "BinByRunNumber" in bin_name): run_schema[pb["run_start"]]={"run_start":pb["run_start"],"run_stop":pb["run_stop"]} try: run_schema[pb["run_start"]]["comment"]=pb["comment"] except: print "Unable to find alias for bin range. Use numbers instead" pass if "BinByPt" in bin_name: run_schema[pb["pT_start"]]={"pT_start":pb["pT_start"],"pT_stop":pb["pT_stop"]} try: run_schema[pb["pT_start"]]["comment"]=pb["comment"] except: print "Unable to find alias for bin range. Use numbers instead" pass if "BinByY" in bin_name: run_schema[pb["Y_start"]]={"Y_start":pb["Y_start"],"Y_stop":pb["Y_stop"]} try: run_schema[pb["Y_start"]]["comment"]=pb["comment"] except: print "Unable to find alias for bin range. Use numbers instead" pass if "BinByStrip" in bin_name: run_schema[pb["Strip_start"]]={"Strip_start":pb["Strip_start"],"Strip_stop":pb["Strip_stop"]} try: run_schema[pb["Strip_start"]]["comment"]=pb["comment"] except: print "Unable to find alias for bin range. Use numbers instead" pass return run_schema #R.TH1F("hist_s","Name",100, -3,3) def create_efficiency_ind(st_name,run_range): #Information which should be filled for individual sector for Efficiency mode #To produce .root files only! global residual_limit global residual_nBins global perform_window_eff_study global efficiency_windows efficiency_ind = {"nbFound":0, "nbExpected":0, "efficiency":0, "err_efficiency":0, "nbFoundNoise":0, "nbExpectedNoise":0, "noise_fraction":0, "noise_fraction_err":0, "clusterSize":R.TH1F("clusterSize"+run_range+"_"+st_name,"Cluster size;;Number of events", 5, 0.5, 5.5), "SNratio":R.TH1F("SNratio"+run_range+"_"+st_name,"S/N ratio;;Number of events", SNratio_nBins, 0., SNratio_limit), "residual":R.TH1F("residualE"+run_range+"_"+st_name,"Residual;[mm];Number of events", residual_nBins, -residual_limit, residual_limit), "residual_noise":R.TH1F("residualNoise"+run_range+"_"+st_name,"Residual(noise);[mm];Number of events", residual_nBins, -residual_limit, residual_limit), "efficiency_hist":R.TH1F("efficiency"+run_range+"_"+st_name,"Efficiency",1, 0.,1.), "noise_fraction_hist":R.TH1F("noise_fraction"+run_range+"_"+st_name,"Noise fraction",1, 0.,1.) } if perform_window_eff_study: # efficiency_ind["window_dependence"] = R.TH1F("wind_dep_"+run_range+"_"+st_name,"Efficiency as a function of search window;[mm];Efficiency",len(efficiency_windows), 0.,1.) efficiency_ind["window_dependence"] = R.TH1F("wind_dep_"+run_range+"_"+st_name,"Efficiency as a function of search window;[mm];Efficiency",40,0,0.4) efficiency_ind["window_dependence_noise"] = R.TH1F("wind_dep_noise_"+run_range+"_"+st_name,"Noise fraction as a function of search window;[mm];Noise fraction",40,0,0.4) return efficiency_ind def create_efficiency_lite(efficiency_ind): #Information which should be filled for individual sector for Efficiency mode #To produce .pkls and build trends. efficiency_lite = { "occupancy":efficiency_ind["nbFound"], "efficiency":efficiency_ind["efficiency"], "err_efficiency":efficiency_ind["err_efficiency"], "mean":efficiency_ind["residual"].GetMean(), "err_mean":efficiency_ind["residual"].GetMeanError(), "width":efficiency_ind["residual"].GetRMS(), "err_width":efficiency_ind["residual"].GetRMSError(), "clusterSize_mean":efficiency_ind["clusterSize"].GetMean(), "SNratio_max":efficiency_ind["SNratio"].GetBinCenter( efficiency_ind["SNratio"].GetMaximumBin()), "noise_fraction":efficiency_ind["noise_fraction"], "noise_fraction_err":efficiency_ind["noise_fraction_err"], } return efficiency_lite def create_coll(det="IT", mode="Monitor"): #To be used for creation of histograms #{<run_start>:{ # "run_start":<run_start>, # "run_stop" :<run_stop>, # "data" :{ # <st_ID1>:{}, # <st_ID2>:{}, # <st_ID3>:{}... #} } } coll = run_binning() if det == "IT": ST_Map = IT_Map_func() else: ST_Map = TT_Map_func() for run_bin in coll: try: run_range=coll[run_bin]["comment"] except: run_range="::::"+str(coll[run_bin]["run_start"])+"::"+str(coll[run_bin]["run_stop"])+"::::" coll[run_bin]["data"]={} for st_id in ST_Map: if mode == "Monitor": coll[run_bin]["data"][st_id]=create_monitor_ind(ST_Map[st_id],run_range) else: coll[run_bin]["data"][st_id]=create_efficiency_ind(ST_Map[st_id],run_range) return coll def make_coll_lite(coll, det="IT", mode="Monitor"): #To be stored in .pkl, to create trends #{<run_start>:{ # "run_start":<run_start>, # "run_stop" :<run_stop>, # "data" :{ # <st_ID1>:{}, # <st_ID2>:{}, # <st_ID3>:{}... #} } } lite_coll = run_binning() if det == "IT": ST_Map = IT_Map_func() else: ST_Map = TT_Map_func() for run_bin in coll: try: run_range=coll[run_bin]["comment"] except: run_range="::::"+str(coll[run_bin]["run_start"])+"::"+str(coll[run_bin]["run_stop"])+"::::" lite_coll[run_bin]["data"]={} for st_id in ST_Map: if mode == "Monitor": lite_coll[run_bin]["data"][st_id]=create_monitor_lite(coll[run_bin]["data"][st_id]) else: lite_coll[run_bin]["data"][st_id]=create_efficiency_lite(coll[run_bin]["data"][st_id]) return lite_coll ##### Efficiency: nbFound/nbExpected already removed hits from noise #### def find_efficiency(coll): for run_bin in coll: for st_ID in coll[run_bin]["data"]: nbf = coll[run_bin]["data"][st_ID]["nbFound"] nbe = coll[run_bin]["data"][st_ID]["nbExpected"] if nbe == 0: coll[run_bin]["data"][st_ID]["efficiency"]= 0 coll[run_bin]["data"][st_ID]["err_efficiency"] = 0 coll[run_bin]["data"][st_ID]["efficiency_hist"].SetBinContent(1, 0) coll[run_bin]["data"][st_ID]["efficiency_hist"].SetBinError(1, 0) continue coll[run_bin]["data"][st_ID]["efficiency"]=nbf/nbe coll[run_bin]["data"][st_ID]["err_efficiency"] = nbf**0.5*(nbe-nbf)**0.5*nbe**(-1.5) coll[run_bin]["data"][st_ID]["efficiency_hist"].SetBinContent(1, coll[run_bin]["data"][st_ID]["efficiency"]) coll[run_bin]["data"][st_ID]["efficiency_hist"].SetBinError(1, coll[run_bin]["data"][st_ID]["err_efficiency"]) return coll def find_noise_fraction(coll): for run_bin in coll: for st_ID in coll[run_bin]["data"]: nbf = coll[run_bin]["data"][st_ID]["nbFoundNoise"] nbe = coll[run_bin]["data"][st_ID]["nbExpected"] if nbe == 0: coll[run_bin]["data"][st_ID]["noise_fraction"]= 0 coll[run_bin]["data"][st_ID]["noise_fraction_err"] = 0 coll[run_bin]["data"][st_ID]["noise_fraction_hist"].SetBinContent(1, 0) coll[run_bin]["data"][st_ID]["noise_fraction_hist"].SetBinError(1, 0) continue coll[run_bin]["data"][st_ID]["noise_fraction"]=nbf/nbe coll[run_bin]["data"][st_ID]["noise_fraction_err"] = nbf**0.5*(nbe-nbf)**0.5*nbe**(-1.5) coll[run_bin]["data"][st_ID]["noise_fraction_hist"].SetBinContent(1, coll[run_bin]["data"][st_ID]["noise_fraction"]) coll[run_bin]["data"][st_ID]["noise_fraction_hist"].SetBinError(1, coll[run_bin]["data"][st_ID]["noise_fraction_err"]) return coll def bins_from_window(window): global residual_limit global residual_nBins bin_width = 2.*float(residual_limit)/float(residual_nBins) bin_low = residual_nBins/2 - int(window/bin_width) bin_hi = residual_nBins-bin_low+1 return [bin_low, bin_hi] def window_eff_study(coll): global efficiency_windows for run_bin in coll: for st_ID in coll[run_bin]["data"]: nbe = coll[run_bin]["data"][st_ID]["nbExpected"] if nbe == 0: continue # coll[run_bin]["data"][st_ID]["window_dependence"].GetXaxis().SetNdivisions(-414) for i, window in enumerate(sorted(efficiency_windows)): nbf = coll[run_bin]["data"][st_ID]["residual"].Integral(bins_from_window(window)[0], bins_from_window(window)[1]) nbfN = coll[run_bin]["data"][st_ID]["residual_noise"].Integral(bins_from_window(window)[0], bins_from_window(window)[1]) coll[run_bin]["data"][st_ID]["window_dependence"].SetBinContent(i+1, nbf/nbe) coll[run_bin]["data"][st_ID]["window_dependence_noise"].SetBinContent(i+1, nbfN/nbe) if nbf!=0: coll[run_bin]["data"][st_ID]["window_dependence"].SetBinError(i+1, nbf**0.5*(nbe-nbf)**0.5*nbe**(-1.5)) else: coll[run_bin]["data"][st_ID]["window_dependence"].SetBinError(i+1, 0) if nbfN!=0: coll[run_bin]["data"][st_ID]["window_dependence_noise"].SetBinError(i+1, nbfN**0.5*(nbe-nbfN)**0.5*nbe**(-1.5)) else: coll[run_bin]["data"][st_ID]["window_dependence_noise"].SetBinError(i+1, 0) # coll[run_bin]["data"][st_ID]["window_dependence"].GetXaxis().SetBinLabel(i+1,str(window)) return coll def write_histogram(coll, mode, name): f = R.TFile(name+"histos"+extra_name+".root","recreate") for run_bin in coll: try: cdtof = f.mkdir(coll[run_bin]["comment"]) except: cdtof = f.mkdir(str(coll[run_bin]["run_start"])+"-"+str(coll[run_bin]["run_stop"])) cdtof.cd() for st_id in coll[run_bin]["data"]: if mode == "Monitor": coll[run_bin]["data"][st_id]["residual"].Write() coll[run_bin]["data"][st_id]["unbiased_residual"].Write() coll[run_bin]["data"][st_id]["rms_unbiased_residual"].Write() else: coll[run_bin]["data"][st_id]["residual"].Write() coll[run_bin]["data"][st_id]["efficiency_hist"].Write() coll[run_bin]["data"][st_id]["residual_noise"].Write() coll[run_bin]["data"][st_id]["clusterSize"].Write() coll[run_bin]["data"][st_id]["SNratio"].Write() coll[run_bin]["data"][st_id]["noise_fraction_hist"].Write() f.Close() return True def write_window_eff_study(coll, mode, name): f = R.TFile(name+"histos"+extra_name+".root","recreate") for run_bin in coll: try: cdtof = f.mkdir(coll[run_bin]["comment"]) except: cdtof = f.mkdir(str(coll[run_bin]["run_start"])+"-"+str(coll[run_bin]["run_stop"])) cdtof.cd() for st_id in coll[run_bin]["data"]: coll[run_bin]["data"][st_id]["window_dependence"].Write() coll[run_bin]["data"][st_id]["window_dependence_noise"].Write() f.Close() return True def create_efficiency_trends(lite_coll, det, name): f = R.TFile(name+"histos"+extra_name+".root","recreate") if det == "IT": ST_Map = IT_Map_func() else: ST_Map = TT_Map_func() for st_id in ST_Map: efficiency = R.TH1F("eff:trend_"+ST_Map[st_id],"Changes of hit efficiency;;Efficiency",len(bin_vector)-1, array('d',bin_vector)) residual_mean = R.TH1F("bias:trend_"+ST_Map[st_id],"Changes of the bias;;Bias, [mm]",len(bin_vector)-1, array('d',bin_vector)) residual_width = R.TH1F("width:trend_"+ST_Map[st_id],"Changes of the hit resolution (width of residual);;Resolution, [mm]",len(bin_vector)-1, array('d',bin_vector)) clusterSize_mean = R.TH1F("clusterSize_mean:trend_"+ST_Map[st_id],"Changes of cluster size;;Cluster size",len(bin_vector)-1, array('d',bin_vector)) SNratio_max = R.TH1F("SNratio_max:trend_"+ST_Map[st_id],"Changes of S/N ratio;;S/N ratio",len(bin_vector)-1, array('d',bin_vector)) noise_fraction = R.TH1F("noise_fraction:trend_"+ST_Map[st_id],"Changes of noise fraction;;Noise fraction",len(bin_vector)-1, array('d',bin_vector)) for i, run_bin in enumerate(sorted(lite_coll.keys())): # Efficiency efficiency.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["efficiency"]) efficiency.SetBinError(i+1, lite_coll[run_bin]["data"][st_id]["err_efficiency"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT bins try: efficiency.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: efficiency.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) # Mean residual_mean.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["mean"]) residual_mean.SetBinError(i+1, lite_coll[run_bin]["data"][st_id]["err_mean"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT histos try: residual_mean.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: residual_mean.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) # Width residual_width.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["width"]) residual_width.SetBinError(i+1, lite_coll[run_bin]["data"][st_id]["err_width"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT histos try: residual_width.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: residual_width.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) # clusterSize clusterSize_mean.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["clusterSize_mean"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT histos try: clusterSize_mean.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: clusterSize_mean.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) # SNratio SNratio_max.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["SNratio_max"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT histos try: SNratio_max.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: SNratio_max.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) # noise_fraction noise_fraction.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["noise_fraction"]) noise_fraction.SetBinError(i+1, lite_coll[run_bin]["data"][st_id]["noise_fraction_err"]) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # don't want bin label for pT histos try: noise_fraction.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: noise_fraction.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) if ("AllRuns" in bin_name) or ("BinByMonth" in bin_name) or ("BinByRunNumber" in bin_name): # natural division for pT binning histo efficiency.GetXaxis().SetNdivisions(-414) residual_mean.GetXaxis().SetNdivisions(-414) residual_width.GetXaxis().SetNdivisions(-414) clusterSize_mean.GetXaxis().SetNdivisions(-414) SNratio_max.GetXaxis().SetNdivisions(-414) noise_fraction.GetXaxis().SetNdivisions(-414) efficiency.Write() residual_mean.Write() residual_width.Write() clusterSize_mean.Write() SNratio_max.Write() noise_fraction.Write() f.Close() print "Residual & efficiency trends created at "+name+"histos.root" return True ###### *********** I did not modified this for the last version with noise ********** #### def create_single_efficiency_trend(lite_coll, sector, plot_address): #Checl if sector name is correct gROOT.SetStyle("Modern") gROOT.ForceStyle() gROOT.ProcessLine(".x lhcbStyle.C") gStyle.SetPadLeftMargin(0.2) gROOT.ForceStyle() setcor_is_found = False if "IT" in sector: ST_Map = IT_Map_func() else: ST_Map = TT_Map_func() for i in ST_Map: if ST_Map[i]==sector: st_id = i break if not st_id: print "Wrong sector. Plese use sector names like 'TTaXRegionBSector9'" return False #Check if efficiency error is in collection. #Check if sollection has information for given sector erreff_in_collection = False for bin in lite_coll: if st_id in lite_coll[bin]["data"]: if "err_efficiency" in lite_coll[bin]["data"][st_id]: setcor_is_found = True erreff_in_collection = True break if not setcor_is_found: print "Trends are empty, please check that you use correct dataset for chosen sector" return False efficiency = R.TH1F(sector,"Changes of hit efficiency of "+sector+";;Efficiency",len(lite_coll),0,1) for i, run_bin in enumerate(sorted(lite_coll.keys())): if st_id in lite_coll[run_bin]["data"]: efficiency.SetBinContent(i+1, lite_coll[run_bin]["data"][st_id]["efficiency"]) if erreff_in_collection: efficiency.SetBinError(i+1, lite_coll[run_bin]["data"][st_id]["err_efficiency"]) efficiency.GetXaxis().SetNdivisions(-414) try: efficiency.GetXaxis().SetBinLabel(i+1,lite_coll[run_bin]["comment"]) except: efficiency.GetXaxis().SetBinLabel(i+1,str(lite_coll[run_bin]["run_start"])+"-"+str(lite_coll[run_bin]["run_stop"])) efficiency.GetYaxis().SetTitleOffset(1.2) c1 = R.TCanvas("c1","c1",600,600) efficiency.Draw() c1.SaveAs(plot_address+"Trend_Efficiency_Sector_"+sector+extra_name+".pdf") c1.SaveAs(plot_address+"Trend_Efficiency_Sector_"+sector+extra_name+".C") gROOT.SetStyle("Modern") gROOT.ForceStyle() return True if __name__ == "__main__": print "Here are contained supplimentary functions for Tuple to Histogram transformation." print "Normally, Tuples are stored as a python dictionaries (see create_coll, create_monitor_ind and create_efficiency_ind)" print "Two type of dictioanries is considered: Efficiency-like and Monitor-like" print "These dictionaries contains histograms, which a later stored in a .root file (In format which is recognized by interactie ST monitor)" print "Basing on time binning, trend histograms are also created. (They are also saved in ST-monitor-friendly .root files)" print "ST map plots are created by funcitons from CreateTTHist and CreateITHist files."