Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'scipy',
'scikit-image',
'scikit-learn>=1.3.0',
'dbscan1d==0.2.3',
'h5flow>=0.2.0',
'pylandau @ git+https://github.com/cuddandr/pylandau.git@npy_owndata_fix#egg=pylandau',
'adc64format @ git+https://github.com/larpix/adc64format.git@v0.1.3#egg=adc64format',
Expand Down
49 changes: 45 additions & 4 deletions src/proto_nd_flow/reco/charge/event_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import numpy as np
import numpy.ma as ma
import numpy.lib.recfunctions as rfn
import numpy.typing as npt
from collections import defaultdict
import logging

from h5flow.core import H5FlowStage
from h5flow.core import H5FlowStage, resources


class EventBuilder(H5FlowStage):
Expand All @@ -16,6 +17,9 @@ class EventBuilder(H5FlowStage):
- ``events_dset_name`` : ``str``, required, output dataset path
- ``hits_dset_name`` : ``str``, required, input dataset path for hits
- ``ext_trigs_dset_name`` : ``str``, required, input dataset path for external triggers
- ``pps_delay_corrector_enabled`` : ``bool``, optional, whether to correct unix_ts_usec
for the GPS-to-PPS delay (see ``pps_delay_extractor_enabled'' in RawEventGenerator)
- ``pps_delay_corrector_config`` : ``dict``, optional, modify parameters of PPS corrector

Both the ``hits_dset_name`` and ``ext_trigs_dset_name`` are required in
the data cache.
Expand All @@ -40,7 +44,8 @@ class EventBuilder(H5FlowStage):
ts_start f8, first external trigger or hit corrected PPS timestamp [ticks]
ts_end f8, last external trigger of hit corrected PPS timestamp [ticks]
n_ext_trigs u4, number of external triggers in event
unix_ts u8, unix timestamp of event [s since epoch]
unix_ts u8, unix timestamp (from PACMAN) of event [s since epoch]
unix_ts_usec u4, estimated microsecond component of event time

'''
class_version = '1.0.0'
Expand All @@ -52,6 +57,7 @@ class EventBuilder(H5FlowStage):
('ts_start', 'f8'), ('ts_end', 'f8'),
('n_ext_trigs', 'u4'),
('unix_ts', 'u8'),
('unix_ts_usec', 'u4'),
])

def __init__(self, **params):
Expand All @@ -61,6 +67,12 @@ def __init__(self, **params):
self.hits_dset_name = params.get('hits_dset_name')
self.ext_trigs_dset_name = params.get('ext_trigs_dset_name')

self.pps_delay_corrector_enabled = \
params.get('pps_delay_corrector_enabled', False)
self.pps_delay_corrector_config = \
params.get('pps_delay_corrector_config', {})
self.pps_delay_correction = 0

def init(self, source_name):
super(EventBuilder, self).init(source_name)

Expand All @@ -70,15 +82,18 @@ def init(self, source_name):
class_version=self.class_version,
source_dset=source_name,
hits_dset=self.hits_dset_name,
ext_trigs_dset=self.ext_trigs_dset_name
)
ext_trigs_dset=self.ext_trigs_dset_name,
pps_delay_corrector_enabled=self.pps_delay_corrector_enabled)

# then set up new datasets
self.data_manager.create_dset(self.events_dset_name, dtype=self.events_dtype)
self.data_manager.create_ref(source_name, self.events_dset_name)
self.data_manager.create_ref(self.events_dset_name, self.hits_dset_name)
self.data_manager.create_ref(self.events_dset_name, self.ext_trigs_dset_name)

if self.pps_delay_corrector_enabled:
self.pps_delay_correction = self.get_pps_delay_correction()

def run(self, source_name, source_slice, cache):
super(EventBuilder, self).run(source_name, source_slice, cache)

Expand All @@ -100,6 +115,8 @@ def run(self, source_name, source_slice, cache):
events_arr['ts_start'] = ts.min(axis=-1)
events_arr['ts_end'] = ts.max(axis=-1)
events_arr['n_ext_trigs'] = np.count_nonzero(ext_trigs_mask, axis=-1)
events_arr['unix_ts_usec'] = self.get_unix_ts_usec(events_arr)

self.data_manager.write_data(self.events_dset_name, events_slice, events_arr)

# save references
Expand All @@ -113,3 +130,27 @@ def run(self, source_name, source_slice, cache):
trigs_ev_id = np.broadcast_to(ev_id, ext_trigs_data.shape)
ref = np.c_[trigs_ev_id[ext_trigs_mask], ext_trigs_data[ext_trigs_mask]['id']]
self.data_manager.write_ref(self.events_dset_name, self.ext_trigs_dset_name, ref)

def get_pps_delay_correction(self) -> int:
dset_name = self.pps_delay_corrector_config['packets_dset_name']
# [(io_group, delay_ticks), ...]:
delay_table: list[tuple[int, int]] \
= self.data_manager.get_attrs(dset_name)['pps_delays']
delays = [r[1] for r in delay_table]

# It appears that sometimes a PACMAN's system clock falls out of sync
# with Unix Time, resulting in an invalid measurement of the PPS delay.
# (E.g., io_group 7 at the beginning of the 2x2 sandbox period.) To
# mitigate this, we let the PACMEN "vote" for the delay by taking the
# median measurement.
return int(np.median(delays))

def get_unix_ts_usec(self, events_arr: npt.NDArray[events_dtype]) \
-> npt.NDArray[np.uint32]:
ticks2usec = 0.1
if self.pps_delay_corrector_enabled:
rollover = resources['RunData'].rollover_ticks
return (((events_arr['ts_start'] + self.pps_delay_correction)
% rollover) * ticks2usec).astype(np.uint32)
else:
return (events_arr['ts_start'] * ticks2usec).astype(np.uint32)
118 changes: 118 additions & 0 deletions src/proto_nd_flow/reco/charge/pps_delay_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Any

import numpy as np
import numpy.typing as npt
from dbscan1d import DBSCAN1D

from h5flow.core import resources
from h5flow.data import H5FlowDataManager


class PPSDelayExtractor:
delay_dtype = np.dtype([
('unix_ts', '<u8'), ('delay_ticks', '<u8'), ('io_group', '<u2')
])

default_window: int = 100
default_dbscan_eps: int = 100
default_min_packets: int = 50

# The params are forwarded from the RawEventGenerator
def __init__(self, **params: Any):
self.packets_dset_name: str = params['packets_dset_name']

k = 'pps_delay_extractor_config'
self.window: int = params[k].get('window', self.default_window)
self.dbscan_eps: int = params[k].get('dbscan_eps',
self.default_dbscan_eps)
self.min_packets: int = params[k].get('min_packets',
self.default_min_packets)
self.debug_mode: bool = params[k].get('debug_mode', False)

self.unix_ts: list[np.uint32] = []
self.delay_ticks: list[np.float64] = []
self.io_group: list[np.uint16] = []

self.data_manager: H5FlowDataManager | None = None

def setup(self, data_manager: H5FlowDataManager):
self.data_manager = data_manager
if self.debug_mode:
self.data_manager.create_dset('charge/pps_delay',
dtype=self.delay_dtype)

def looks_gps_aligned(self, timestamps: npt.NDArray[np.uint64]) \
-> bool:
rollover = resources['RunData'].rollover_ticks
# Get rid of missed-SYNC timestamps
ts = timestamps[timestamps < rollover]
lo, hi = np.quantile(ts, [0.45, 0.55])
return hi - lo > 0.5 * rollover

def update(self, packets: npt.NDArray[Any]):
for iog in np.unique(packets['io_group']):
all_pkts = packets[packets['io_group'] == iog]
ts2pkt = np.where(all_pkts['packet_type'] == 4)[0]
all_ts = all_pkts[ts2pkt]['timestamp']
jumps = np.where(all_ts[1:] - all_ts[:-1] == 1)[0]
clusters = DBSCAN1D(eps=self.dbscan_eps, min_samples=1) \
.fit(jumps.reshape(-1, 1))

assert clusters.labels_ is not None
for i in list(range(max(clusters.labels_))):
ts_idcs = jumps[clusters.labels_ == i]
pkt_idcs = ts2pkt[ts_idcs] # indices of ts pkts in this cluster

ts_pkts = all_pkts[pkt_idcs]
unix_ts = ts_pkts['timestamp']
assert len(np.unique(unix_ts) == 2)
t = np.max(unix_ts)

pkts = all_pkts[np.min(pkt_idcs) - self.window
:np.max(pkt_idcs) + self.window]
rollover = resources['RunData'].rollover_ticks
data_pkts = pkts[pkts['packet_type'] == 0]
if len(data_pkts) < self.min_packets:
continue
# A large spread indicates that we're straddling the PPS pulse,
# in which case we can just assign a delay of zero
if self.looks_gps_aligned(data_pkts['timestamp']):
delay = np.float64(0.)
else:
delay = np.float64(rollover) - np.median(data_pkts['timestamp'])

self.unix_ts.append(t)
self.delay_ticks.append(delay)
self.io_group.append(iog)

def finish(self):
data = np.empty(len(self.unix_ts), dtype=self.delay_dtype)
order = np.argsort(self.unix_ts)
data['unix_ts'] = np.array(self.unix_ts)[order]
data['delay_ticks'] = np.array(self.delay_ticks)[order]
data['io_group'] = np.array(self.io_group)[order]

delays: list[tuple[int, int]] = [] # iog -> median delay

for iog in np.sort(np.unique(data['io_group'])):
sel = data['io_group'] == iog
# Similarly to the case in update(), a large spread here indicates
# that, at some point in this file, the PPS pulses came into
# alignment with the GPS ticks. In that case, just set the overall
# file's PPS delay to zero.
rollover = resources['RunData'].rollover_ticks
if self.looks_gps_aligned(data[sel]['delay_ticks']):
delay = 0
else:
delay = np.median(data[sel]['delay_ticks'])
delays.append((int(iog), int(delay)))

assert self.data_manager is not None, "call setup plz"
self.data_manager.set_attrs(self.packets_dset_name,
pps_delays=delays)

if self.debug_mode:
name = 'charge/pps_delay'
self.data_manager.create_dset(name, dtype=self.delay_dtype)
sl = self.data_manager.reserve_data(name, len(self.unix_ts))
self.data_manager.write_data(name, sl, data)
18 changes: 18 additions & 0 deletions src/proto_nd_flow/reco/charge/raw_event_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from h5flow import H5FLOW_MPI

from proto_nd_flow.reco.charge.raw_event_builder import *
from proto_nd_flow.reco.charge.pps_delay_extractor import PPSDelayExtractor
import proto_nd_flow.util.units as units


Expand Down Expand Up @@ -44,6 +45,8 @@ class RawEventGenerator(H5FlowGenerator):
- ``mc_tracks_dset_name`` : ``str``, optional, output dataset path for mc truth tracks (if present)
- ``mc_trajectories_dset_name`` : ``str``, optional, output dataset path for mc truth trajectories (if present)
- ``mc_packet_fraction_dset_name`` : ``str``, optional, output dataset path for packet charge fraction truth (if present)
- ``pps_delay_extractor_enabled`` : ``bool``, optional, whether to extract the delay between GPS and PPS ticks
- ``pps_delay_extractor_config`` : ``dict``, optional, modify parameters of the PPS delay extractor
- ``autocorrect_unix_ts`` : ``bool``, optional, take median unix_ts among io_groups instead of just first iog

``dset_name`` points to a lightweight array used to organize low-level
Expand Down Expand Up @@ -90,6 +93,7 @@ class RawEventGenerator(H5FlowGenerator):
default_mc_packet_fraction_dset_name = 'mc_truth/packet_fraction'
default_truth_ref = True
default_autocorrect_unix_ts = False
default_pps_delay_extractor_enabled = False

raw_event_dtype = np.dtype([
('id', 'u8'),
Expand Down Expand Up @@ -123,6 +127,8 @@ def __init__(self, **params):
self.truth_ref = params.get('truth_ref', self.default_truth_ref)
self.autocorrect_unix_ts = params.get('autocorrect_unix_ts',
self.default_autocorrect_unix_ts)
self.pps_delay_extractor_enabled = params.get('pps_delay_extractor_enabled',
self.default_pps_delay_extractor_enabled)

# create event builder
self.event_builder = globals()[self.event_builder_class](**self.event_builder_config)
Expand All @@ -142,12 +148,18 @@ def __init__(self, **params):
self.slices = [slice(st, st + self.buffer_size) for st in range(self.start_position + self.rank * self.buffer_size, self.end_position, self.size * self.buffer_size)]
self.iteration = 0

if self.pps_delay_extractor_enabled:
self.delay_extractor = PPSDelayExtractor(**params)

def __len__(self):
return len(self.slices)

def init(self):
super(RawEventGenerator, self).init()

if self.pps_delay_extractor_enabled:
self.delay_extractor.setup(self.data_manager)

if self.data_manager.dset_exists(self.raw_event_dset_name):
raise RuntimeError(f'{self.raw_event_dset_name} already exists, refusing to append!')
if self.data_manager.dset_exists(self.packets_dset_name):
Expand Down Expand Up @@ -400,6 +412,9 @@ def finish(self):
super(RawEventGenerator, self).finish()
self.input_fh.close()

if self.pps_delay_extractor_enabled:
self.delay_extractor.finish()

def next(self):
'''
Read in a new block of LArPix packet data from the input file and
Expand Down Expand Up @@ -569,6 +584,9 @@ def nhit_filter(x):
ref = np.unique(ref, axis=0) if len(ref) else ref
self.data_manager.write_ref(self.raw_event_dset_name, self.mc_events_dset_name, ref)

if self.pps_delay_extractor_enabled:
self.delay_extractor.update(packet_buffer)

return raw_event_slice if nevents else H5FlowGenerator.EMPTY

def pass_last_unix_ts(self, packets):
Expand Down
6 changes: 6 additions & 0 deletions yamls/proto_nd_flow/reco/charge/EventBuilderData.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@ params:

# output
events_dset_name: 'charge/events'

pps_delay_corrector_enabled: True
pps_delay_corrector_config:
# The measured PPS delays (from the raw event generator) are stored as
# attributes of the packets dataset
packets_dset_name: 'charge/packets'
3 changes: 3 additions & 0 deletions yamls/proto_nd_flow/reco/charge/RawEventGeneratorData.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ params:
off_beam_threshold : 20
shifted_event_dt : -70
autocorrect_unix_ts: True
pps_delay_extractor_enabled: True
pps_delay_extractor_config:
debug_mode: True
Loading