Newer
Older
Tb / TbIO / src / TbEventBuilder.cpp
#include <algorithm>

// Gaudi
#include "GaudiKernel/IEventProcessor.h"
#include "GaudiAlg/ISequencerTimerTool.h"

// Tb/TbKernel
#include "TbKernel/TbFunctors.h"
#include "TbKernel/TbConstants.h"

// Local
#include "TbEventBuilder.h"
#include "TbRawFile.h"

#include <chrono>
#include <ctime>

DECLARE_ALGORITHM_FACTORY(TbEventBuilder)

const int64_t TbEventBuilder::m_maxTimeDifference = std::pow(2, 40);

//=============================================================================
// Standard constructor
//=============================================================================
TbEventBuilder::TbEventBuilder(const std::string& name, 
                               ISvcLocator* pSvcLocator)
  : TbAlgorithm(name, pSvcLocator),
    m_streams(), m_triggers(), m_hits() {

    declareProperty("HitLocation", 
        m_hitLocation = LHCb::TbHitLocation::Default);
    declareProperty("TriggerLocation",
        m_triggerLocation = LHCb::TbTriggerLocation::Default);

    // Length of event in global time units.
    declareProperty("EventLength", m_tick = 10 * Tb::SpidrTime);
    // Length to look into 'future' events to correct
    // for time misordering around event boundaries
    declareProperty("CacheLength", m_cachelength = 20 * Tb::SpidrTime);

    // The time by which events 'overlap' to prevent tracks / clusters
    // being cut between different events. Implementation is rather
    // convoluted, relies on track identification and global event defintion
    // via the TbTimingSvc
    declareProperty("OverlapTime", m_overlapTime = 0 * Tb::ToA);

    // Min. number of planes with at least one hit required to make an event
    declareProperty("MinPlanesWithHits", m_nMinPlanesWithHits = 1);
    // Print frequency
    declareProperty("PrintFreq", m_printFreq = 100);
    // Size of the header, if not set is read in
    declareProperty("HeaderSize", m_headerSize = 0);
    // Flag to print out the header information
    declareProperty("PrintHeader", m_printHeader = false);
    // Flag to switch monitoring print-out and histograms.
    declareProperty("Monitoring", m_monitoring = false);
    // Time to start data processing, in s 
    declareProperty("StartTime", m_startTime = 0);
    // Time to end data processing, in ms
    declareProperty("EndTime", m_endTime = 0);
    // Flag to produce Equalisation maps
    declareProperty("EqualisationHistos", m_equal = false );
    // Maximum number of packets that can be lost before 
    // considered a critical failure 
    declareProperty("MaxLostPackets", m_maxLostPackets = 1000);
    // Maximum number of timing packets that not read
    // before considered a critical failure
    declareProperty("MaxLostTimers", m_maxLostTimers = 10);
    // Force the cache to update every cycle, as opposed to
    // waiting until the first packet is in view
    declareProperty("ForceCaching", m_forceCaching = false);
    declareProperty("IgnoreGlobalClock", m_ignoreGlobalClock = false );
  }

//=============================================================================
// Destructor
//=============================================================================
TbEventBuilder::~TbEventBuilder() {}

//=============================================================================
// Initialisation
//=============================================================================
StatusCode TbEventBuilder::initialize() {
  // Initialise the base class.
  StatusCode sc = TbAlgorithm::initialize();
  if (sc.isFailure()) return sc;

  // Prepare the hit containers.
  m_hits.resize(m_nPlanes, nullptr);
  m_triggers.resize(m_nPlanes, nullptr);
  // Setup the raw stream tools.
  for (unsigned int i = 0; i < m_nDevices; ++i){
    const std::string toolname = "TbRawStream/TbRawStream"+ std::to_string(i);
    m_streams.push_back(tool<TbRawStream>(toolname));
    if (!m_streams[i]) return Error("Cannot initialise " + toolname);
    m_streams[i]->setDevice(i);
  }
  // Setup the header decoder tool.
  m_headerDecoder = tool<TbHeaderDecoder>("TbHeaderDecoder");
  if (!m_headerDecoder) return Error("Cannot initialise header decoder.");
  m_headerDecoder->print(m_printHeader);
  // Propagate the overlap time to the timing service.
  timingSvc()->setOverlap(m_overlapTime);
  // Get the list of input data 
  auto files = dataSvc()->getInputFiles();
  for (const auto& filename : files) {
    // Check the filename.
    const size_t pos = filename.find(".dat");
    const size_t dash = filename.find_last_of("-");
    if (pos == std::string::npos) {
      warning() << "Skipping " << filename << " (not a .dat file)" << endmsg;
      continue; 
    }
    if (!(pos - dash > 1)) {
      warning() << "Unexpected filename (" << filename << ")" << endmsg;
      warning() << "Skipping " << filename << endmsg;
      continue;
    }
    TbRawFile* f = new TbRawFile(filename, m_headerDecoder);
    if (!f->good()) {
      if (!f->is_open()) {
        error() << "Cannot open " << filename << endmsg;
      } else {
        f->close();
      }
      warning() << "Skipping " << filename << endmsg; 
      delete f;
      continue;
    }
    const std::string id = f->id();
    const unsigned int plane = geomSvc()->plane(id);
    if (plane == 999) {
      warning() << id << " is not listed in the alignment file" << endmsg;
      warning() << "Skipping " << filename << endmsg; 
      f->close();
      delete f;
      continue;
    }
    const unsigned int deviceIndex = geomSvc()->deviceIndex(id);
    if (deviceIndex == 999) {
      warning() << id << " is not listed in the alignment file" << endmsg;
      warning() << "Skipping " << filename << endmsg;
      f->close();
      delete f;
      continue;
    } 
    if (m_streams[deviceIndex]->files().empty()) { 
      info() << id << " (device " << deviceIndex 
             << ") is mapped to plane " << plane << endmsg;
    }
    unsigned int col = 0;
    auto chips = geomSvc()->module(plane)->chips();
    for (auto chip : chips) {
      if (chip.id == id) {
        col = chip.col;
      }
    } 
    m_streams[deviceIndex]->setPlane(plane);
    m_streams[deviceIndex]->setDevice(deviceIndex);
    m_streams[deviceIndex]->setOffset(col);
    m_streams[deviceIndex]->addFile(f);
  }
  // Make sure that there are data files for all planes.
  for (unsigned int i = 0; i < m_nDevices; ++i) {
    if (m_streams[i]->files().empty()) {
      return Error("No input files for device " + std::to_string(i));
    }
  }

  // Convert start and end times to FToA.
  m_startTime *= Tb::millisecond * 1000;
  m_endTime *= Tb::millisecond;
  for (auto& f : m_streams) {
    std::sort(f->files().begin(), f->files().end(), lessBySplitIndex());
    f->prepare();
    m_nDataInFiles += f->size();
    // Convert starting time to FToA times
    if (m_startTime != 0) f->fastForward(m_startTime);
  }
  info() << "Total data size: " << m_nDataInFiles << " packets" << endmsg;
  m_clock = m_tick + m_startTime;
  return StatusCode::SUCCESS;
}

//=============================================================================
// Main execution
//=============================================================================
StatusCode TbEventBuilder::execute() {

  std::clock_t c_end = std::clock();
  std::chrono::_V2::system_clock::time_point t_end = std::chrono::high_resolution_clock::now();
  if (m_nPackets != 0) {
    double time = std::chrono::duration<double, std::milli>(t_end-t_start).count() ;
    plot2D(m_nPackets, time, "RateVsTime",0.,1000000.,0.,3000.,100,100);
    plot2D(m_nPackets, time / (double)m_nPackets, "RateVsTimePerPacket",0.,1000000.,0.,0.01,100,100);
    //info() << " packets = " << m_nPackets << " time = " << time << endmsg;
    m_nPackets = 0;
  }
  /*
     std::cout << std::fixed << std::setprecision(2) << "CPU time used: "
     << 1000.0 * (c_end-c_start) / CLOCKS_PER_SEC << " ms\n"
     << "Wall clock time passed: "
     << std::chrono::duration<double, std::milli>(t_end-t_start).count()
     << " ms\n";
     */

  c_start = c_end;
  t_start = t_end; 

  // Create containers for hits and triggers.
  for (unsigned int i = 0; i < m_nPlanes; ++i) {
    const std::string ext = std::to_string(i);
    LHCb::TbHits* hits = new LHCb::TbHits();
    put(hits, m_hitLocation + ext);
    m_hits[i] = hits;
    LHCb::TbTriggers* triggers = new LHCb::TbTriggers();
    put(triggers, m_triggerLocation + ext);
    m_triggers[i] = triggers;
  }
  bool done = false;
  bool eof = true;
  bool eot = false;
  while (!done) {
    eof = true;
    // Update the event boundaries.
    if (UNLIKELY(msgLevel(MSG::DEBUG))) {
      debug() << "Event definition: " << m_clock - m_tick 
        << " to " << m_clock + m_overlapTime
        << endmsg;
    }
    timingSvc()->setEventDefinition(m_clock - m_tick,
        m_clock + m_overlapTime);
    for (unsigned int i = 0 ; i < m_nPlanes; ++i) {
      m_hits[i]->clear();
      m_triggers[i]->clear();
    }
    for (auto f : m_streams) {
      eof &= f->eos();
      fill(f, m_hits[f->plane()], m_triggers[f->plane()], eot);
    } 
    unsigned int nPlanesWithHits = 0;
    bool gotTrigger = false;
    for (unsigned int i = 0; i < m_nPlanes; ++i) {
      if (!m_triggers[i]->empty()) {
        gotTrigger = true;
        break;
      }
      if (!m_hits[i]->empty()) ++nPlanesWithHits;
    }
    if (gotTrigger || nPlanesWithHits >= m_nMinPlanesWithHits) {
      done = true;
    } else {
      ++m_nNoiseEvents;
    }
    m_clock += m_tick;
    if (eof) break;
  }
  // Sort hits and triggers by time. 
  for (unsigned int i = 0; i < m_nPlanes; ++i) {
    std::sort(m_hits[i]->begin(), m_hits[i]->end(),
        TbFunctors::LessByTime<const LHCb::TbHit*>());
    std::sort(m_triggers[i]->begin(), m_triggers[i]->end(),
        TbFunctors::LessByTime<const LHCb::TbTrigger*>());
    m_nPackets += m_hits[i]->size() ; 
  }
  // Increment the event counter.
  ++m_nEvents;
  if (m_nEvents % m_printFreq == 0) {
    info() << format(" %8u events read, %12u hits, %12u triggers", 
        m_nEvents, m_nHitsRead, m_nTriggersRead) << endmsg; 
    if (m_monitoring) {
      info() << "Hits:     ";
      for (unsigned int i = 0; i < m_nPlanes; ++i) {
        info() << format(" %12d", m_hits[i]->size());
      }
      info() << endmsg << "Cache:    ";
      for (unsigned int i = 0; i < m_nPlanes; ++i) {
        info() << format(" %12d", m_streams[i]->hitCache().size());
      }
      info() << endmsg << "Triggers: ";
      for (unsigned int i = 0; i < m_nPlanes; ++i) {
        info() << format(" %12d", m_triggers[i]->size());
      }
      info() << endmsg << "Cache:    ";
      for (unsigned int i = 0; i < m_nPlanes; ++i) {
        info() << format(" %12d", m_streams[i]->trgCache().size());
      }
      info() << endmsg;
    }
  }
  bool emptyCache = true;
  for (const auto& f : m_streams) {
    if (!f->hitCache().empty() || !f->trgCache().empty()) {
      emptyCache = false;
      break;
    }
  }
  // Check if there are any events left to process.
  if (((m_nData == m_nDataInFiles || eof) && emptyCache) || eot) {
    // Terminate the application.
    SmartIF<IEventProcessor> app(serviceLocator()->service("ApplicationMgr"));
    if (app) app->stopRun();
  }
  if (m_nLostTimers > m_maxLostTimers) {
    return Error("More than " + std::to_string(m_maxLostTimers) + 
        " clock packets dropped. Critical problem with global clock");
  }
  if (m_nLostPackets > m_maxLostPackets) {
    return Error("More than " + std::to_string(m_maxLostPackets) +  
        " packets  dropped. Critical problem with timing");
  }
  return StatusCode::SUCCESS;
}

//=============================================================================
// Add hits and triggers to the containers
//=============================================================================
bool TbEventBuilder::fill(TbRawStream* f, LHCb::TbHits* hits,
    LHCb::TbTriggers* triggers,
    bool& eot) {
  if (!dumpCache(f, hits) && !m_forceCaching) {
    if (UNLIKELY(msgLevel(MSG::DEBUG))) {
      debug() << "Event boundary: " << m_clock << endmsg;
      if (f->hitCache().size() > 0) {
        debug() << "First hit in cache:" << f->hitCache().front()->time()
          << endmsg;
      }
      if (f->trgCache().size() > 0) {
        debug() << "First trigger in cache: " << f->trgCache().front()->time()
          << endmsg;
      }
    }
    dumpCache(f, triggers);
    return false;
  }
  dumpCache(f, triggers);
  const unsigned int device = f->device();
  uint64_t currentTime = 0;
  while (likely(currentTime < m_clock + m_cachelength && !f->eos())) {
    const uint64_t packet = f->getNext();
    ++m_nData;
    const unsigned int header = 0xF & (packet >> 60);
    if (likely(header == 0xA || header == 0xB)) {
      // Pixel packets.
      ++m_nHitsRead;     
      // Get the pixel adress.
      const unsigned int pixelAddress = 0xFFFF & (packet >> 44);
      // Skip masked pixels.
      if (unlikely(pixelSvc()->isMasked(pixelAddress, device))) continue;
      LHCb::TbHit* hit = decodeTPX3Hit( packet, pixelAddress, device , f->col() );
      hit->setCharge(pixelSvc()->charge(hit->ToT(), pixelAddress, device));
      extendTimeStamp(hit, m_ignoreGlobalClock ? f->m_tpx3Timer : f->timer());
      pixelSvc()->applyPhaseCorrection(hit);
      writePacket(hit, f, hits);
      currentTime = hit->time();
      
      //info() << std::dec << (double)prevTime / (double)Tb::second << "   " << (double)currentTime / (double)Tb::second << "    " << f->hClock() << endmsg;
      syncTPX3( currentTime, f ) ; 
      if (UNLIKELY(m_monitoring)) {
        plot(hit->time() / Tb::millisecond, "HitDataRate", 
            "Rate of hit packets", 0.0, 620000.0, 6200000);
      }
    } else if ((header == 0x6 || header == 0x4) &&
        (( packet >> 56) & 0xF) == 0xF) {
      // New trigger packets
      LHCb::TbTrigger* trigger = new LHCb::TbTrigger(packet);
      extendTimeStamp(trigger, f->timer());
      trigger->setPlane(f->plane());
      writePacket(trigger, f, triggers);
      ++m_nTriggersRead;
      if (m_monitoring) {
        plot(trigger->time() / Tb::millisecond, "TriggerDataRate",
            "Rate of trigger packets", 0.0, 620000.0, 620000);
      }
    } else if (header == 0x4 && ! m_ignoreGlobalClock ) {
      // Timing packets
      int state = f->addTimingPacket(packet);
      if (state == 2) 
      {
        warning() << "Jump in timing of greater than 6.7s detected."
          << format("Dropping timing packet: 0x%x", packet) 
          << endmsg;
        warning() << "Will attempt to resynchronise using a different SPIDR's"
          << " global clock" << endmsg;
        attemptResync( f , packet );
        if( m_nLostTimers > m_maxLostTimers ) return true;      
        // If fail to add as a timing packet, data packet unknown
      }
      else if (state == 0) {
        ++m_unknownPackets;
        if (UNLIKELY(msgLevel(MSG::DEBUG))) {
          debug() << format("Timing packet with subheader 0x%x, packet = 0x%x",
              ((packet >> 56) & 0xF), packet) 
            << endmsg; 
        }
      }
    }   
    else {
      // Packet is neither hit, nor a trigger, nor a timing packet.
      ++m_unknownPackets;
      if (UNLIKELY(msgLevel(MSG::DEBUG))) {
        warning() << format("Packet with header 0x%x, packet = 0x%x", 
            header, packet) << endmsg;
      }
    }
  }
  std::sort(f->hitCache().begin(), f->hitCache().end(),
      TbFunctors::LessByTimeBP<const LHCb::TbHit*>());
  std::sort(f->trgCache().begin(), f->trgCache().end(),
      TbFunctors::LessByTimeBP<const LHCb::TbTrigger*>());

  if (m_endTime != 0 && currentTime > m_endTime) eot = true;
  if (!hits->empty() || !triggers->empty() || f->eos()) return true;
  return false;
}

//=============================================================================
// Finalisation
//=============================================================================
void TbEventBuilder::syncTPX3(const uint64_t& thisPacketTime, TbRawStream* f) { 

  int timerMsbs = 0x3 & ( f->m_tpx3Timer >> 40 );
  int thisMsb = 0x3 & ( thisPacketTime >> 40 ); 
  constexpr uint64_t one = (uint64_t)(1) << 40;

  if( thisMsb == timerMsbs+1 || thisMsb == timerMsbs-3){
    info() << "Updating clock " << (double)(f->m_tpx3Timer + one)/(double)Tb::second << endmsg; 
    f->m_tpx3Timer += one;
  }
  if( thisMsb > timerMsbs ){
    info() << thisMsb << "   " << timerMsbs << endmsg; 
  }
}

bool TbEventBuilder::attemptResync(TbRawStream* f, const uint64_t& packet) {

  for (unsigned int chip = 0 ; chip != m_nPlanes; ++chip) {
    int64_t dt = (int64_t)f->timer() - (int64_t)m_streams[chip]->timer();
    if (std::abs(dt) > m_maxTimeDifference) {
      f->setLSB(m_streams[chip]->lsb());
      f->setGlobalClock(m_streams[chip]->timer());
      const int state = f->addTimingPacket(packet);
      if (state != 2) {
        info() << "Resync of device " << chip << " successful!" << endmsg;
        return true; 
      }
    }
  }
  warning() << "Resynchronisation of device fails" << endmsg;
  // Empty the cache.
  for (auto it = f->hitCache().begin(); it != f->hitCache().end(); ++it) {
    if (*it) delete *it;
  }
  f->hitCache().clear();
  for (auto it = f->trgCache().begin(); it != f->trgCache().end(); ++it) {
    if (*it) delete *it;
  }
  f->trgCache().clear();
  m_nLostTimers++;
  return false; 
}

StatusCode TbEventBuilder::finalize() {

  for (auto& f : m_streams) {
    info() << format("Plane %2u: %12u packets in file, %12u hits in cache",
        f->plane(), f->size(), f->hitCache().size()) << endmsg;
    for (auto it = f->hitCache().begin(); it != f->hitCache().end(); ++it) {
      if (*it) delete *it;
    }
    for (auto it = f->trgCache().begin(); it != f->trgCache().end(); ++it) {
      if (*it) delete *it;
    }
    f->close();
  }
  info() << "Fraction of data read: " << (double)m_nData /
    (double)m_nDataInFiles << endmsg;
  info() << "Number of packets lost: " << m_nLostPackets << endmsg;
  info() << "Unknown packets: " << m_unknownPackets << endmsg;
  if (m_nNoiseEvents > 0) {
    info() << "Skipped " << m_nNoiseEvents << " noise events." << endmsg;
  }
  // Finalise the base class.
  return TbAlgorithm::finalize();
}

//=============================================================================
// Dump cached packets into the current event
//=============================================================================
template <typename T>
bool TbEventBuilder::dumpCache(
    TbRawStream* stream, KeyedContainer<T, Containers::HashMap>* container) {

  // Load the hits/triggers stored in the cache.
  std::vector<T*>* cache = stream->cache<T*>();
  auto it = cache->begin();
  const auto end = cache->end();
  for (; it != end; ++it) {
    const auto time = (*it)->time();
    if (unlikely(time >= m_clock + m_overlapTime)) break;
    if (unlikely(time < m_clock - m_tick ) ) {
      // Packet is earlier than the current event.
      // If this happens, the cache may be too short.
      warning() << format("Packet 0x%016llX", (*it)->data()) << " is "
        << " ns before event low edge! Current event definition: "
        << (m_clock - m_tick) / Tb::SpidrTime << " to "
        << (m_clock + m_overlapTime) / Tb::SpidrTime << endmsg;
      ++m_nLostPackets;
      delete *it;
      if (m_nLostPackets > m_maxLostPackets) {
        error() << "Large number of packets lost -> critical failure" << endmsg;
        return false;
      }
      continue;
    }
    (*it)->setHtime(timingSvc()->globalToLocal(time));
    // Move packet to the TES.
    container->insert(*it);
  }
  cache->erase(cache->begin(), it);
  if (container->empty() && !cache->empty()) return false;
  return true;
}

//=============================================================================
// Extend the timestamp of a packet
//=============================================================================
template <typename T>
void TbEventBuilder::extendTimeStamp(T* packet, uint64_t global_time) {

  const uint64_t packet_time = packet->time();
  const int diff = (0x3 & (global_time >> 40)) - (0x3 & (packet_time >> 40));
  constexpr uint64_t one = (uint64_t)(1) << 40;
  // Calculate the difference between the bits that should match between
  // the spidr time and the global time, if they do not match, increment or
  // decrement the global time so that they match and add the 18 m.s.f.
  // of the global time to the packet time
  if (diff == 1 || diff == -3) {
    global_time = global_time - one;
  } else if (diff == -1 || diff == 3) {
    global_time = global_time + one;
  }
  packet->setTime((0x3FFFFFFFFFF & packet_time) +
      (global_time & 0xFFFFC0000000000));
}

LHCb::TbHit* TbEventBuilder::decodeTPX3Hit( const uint64_t& packet,const unsigned int& pixelAddress, const unsigned int& device , const unsigned int& fCol ){

  LHCb::TbHit* hit = new LHCb::TbHit();
  hit->setDevice(device);
  hit->setData(packet);
  hit->setPixelAddress(pixelAddress);
  // Decode the pixel address, first get the double column. 
  const unsigned int dcol = (0xFE00 & pixelAddress) >> 8;
  // Get the super pixel address.
  const unsigned int spix = (0x01F8 & pixelAddress) >> 1;
  // Get the address of the pixel within the super pixel. 
  const unsigned int pix = (0x0007 & pixelAddress);
  // Calculate and store the row and column numbers.
  const unsigned int col = dcol + pix / 4;
  const unsigned int row = spix + (pix & 0x3);
  hit->setCol(col);
  hit->setRow(row);
  hit->setScol(col + fCol);
  const unsigned int data = (packet & 0x00000FFFFFFF0000) >> 16;
  // Extract and store the ToT and the corresponding charge.
  const unsigned int tot = (data & 0x00003FF0) >> 4;
  hit->setToT(tot);
  //  hit->setCharge(pixelSvc()->charge(tot, pixelAddress, device));
  // Get the time stamps.
  const uint64_t spidrTime = packet & 0x000000000000FFFF;
  const uint64_t ftoa = data & 0x0000000F;
  const uint64_t toa = (data & 0x0FFFC000) >> 14;
  // Calculate the global timestamp.
  const uint64_t fulltime = ((spidrTime << 18) + (toa << 4) + (15 - ftoa)) << 8;
  hit->setTime(fulltime);

  return hit;
}