Skip to content

epiphyte.preprocessing

annotation

data_driven_annotation

cpt

Detects the changepoint in a spike train using parametric statistic testing.

Plots the results and returns the index of the breakpoint and the stat test results.

Used for demonstrating the addition of a new table to an existing database.

find_changepoint_tt
find_changepoint_tt(data, verbose=False)

Detects the changepoint in a spike train using parametric statistic testing. Plots the results and returns the index of the breakpoint and the stat test results.

Accepts and runs one unit. Set up to be iterated over for the whole of a dataset of units.

Parameters:

Name Type Description Default
data ndarray

spike train data (1D array-like)

required
verbose bool

if True, prints detailed changepoint and t-test results

False

Returns:

Name Type Description
taustar int

index of the determined breakpoint.

ttest ttest_ind_from_stats

results from the scipy stats ttest (type is a specific class construction from scipy)

Source code in epiphyte/preprocessing/annotation/data_driven_annotation/cpt.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def find_changepoint_tt(data, verbose=False):
    """
    Detects the changepoint in a spike train using parametric statistic testing.
    Plots the results and returns the index of the breakpoint and the stat test results.

    Accepts and runs one unit. Set up to be iterated over for the whole of a dataset of units.

    Args:
        data (np.ndarray):  spike train data (1D array-like)
        verbose (bool):    if True, prints detailed changepoint and t-test results

    Returns:
        taustar (int): index of the determined breakpoint.
        ttest (stats.ttest_ind_from_stats): results from the scipy stats ttest (type is a specific class construction from scipy)
    """
    n = len(data)

    mu0 = np.mean(data)  # global mean
    s0 = np.sum((data - mu0) ** 2)  # squared difference from global mean
    s1 = np.asarray(
        [np.sum((data[0:i] - np.mean(data[0:i])) ** 2) for i in range(1, n)])  # squared sum before changepoint
    s2 = np.asarray(
        [np.sum((data[i:] - np.mean(data[i:])) ** 2) for i in range(1, n)])  # squared sum after changepoint

    R = s0 - s1 - s2
    G = np.max(R)

    taustar = int(np.where(R == G)[0]) + 1

    m1 = np.mean(data[0:taustar])
    std1 = np.std(data[0:taustar])
    nobs1 = len(data[0:taustar])
    m2 = np.mean(data[taustar:])
    std2 = np.std(data[taustar:])
    nobs2 = len(data[taustar:])

    ttest = stats.ttest_ind_from_stats(m1, std1, nobs1, m2, std2, nobs2)

    if verbose:
        print("Changepoint results:")
        print("tau*: ", taustar)
        print("mean pre-tau: ", m1)
        print("std  pre-tau: ", std1)
        print("mean post-tau: ", m2)
        print("std  post-tau:", std2)
        print("T-test results:")
        print(ttest)
        print("")

    return taustar, ttest
plot_changepoint
plot_changepoint(data, taustar, ttest, save=None, filename=None, comparison_tau=None)

Plot a change point in neural activity data and annotate with statistical results.

This function visualizes time-binned neural activity and overlays vertical lines indicating the detected change point (taustar) and, optionally, a second change point (comparison_tau). It also displays the results of a t-test as legend entries.

Parameters:

Name Type Description Default
data array - like

Sequence of neural activity values.

required
taustar int

Detected change point index (in 1-second bins).

required
ttest tuple

A tuple containing (t-statistic, p-value) from a t-test.

required
save str

Directory path where plots should be saved. If None, plots are not saved.

None
filename str

Base filename (without extension) for saving plots. Required if save is provided.

None
comparison_tau int

Another change point index to compare with taustar.

None
Saves
  • {filename}_cpt.png in the save directory if save is provided.
  • {filename}_cpt.svg in the save directory if save is provided.
Example
from scipy.stats import ttest_ind

data = [0, 1, 2, 5, 6, 7, 10, 11]
ttest = ttest_ind(data[:4], data[4:])
plot_changepoint(data, taustar=4, ttest=ttest, save="plots/", filename="session1")
Source code in epiphyte/preprocessing/annotation/data_driven_annotation/cpt.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def plot_changepoint(data, taustar, ttest, save=None, filename=None, comparison_tau=None):
    """
    Plot a change point in neural activity data and annotate with statistical results.

    This function visualizes time-binned neural activity and overlays vertical
    lines indicating the detected change point (`taustar`) and, optionally, a
    second change point (`comparison_tau`). It also displays the results of a
    t-test as legend entries.

    Args:
        data (array-like): 
            Sequence of neural activity values.
        taustar (int): 
            Detected change point index (in 1-second bins).
        ttest (tuple): 
            A tuple containing (t-statistic, p-value) from a t-test.
        save (str, optional): 
            Directory path where plots should be saved. If None, plots are not saved.
        filename (str, optional): 
            Base filename (without extension) for saving plots. Required if `save` is provided.
        comparison_tau (int, optional): 
            Another change point index to compare with `taustar`.

    Saves:
        - `{filename}_cpt.png` in the `save` directory if `save` is provided.
        - `{filename}_cpt.svg` in the `save` directory if `save` is provided.

    Example:
        ```python
        from scipy.stats import ttest_ind

        data = [0, 1, 2, 5, 6, 7, 10, 11]
        ttest = ttest_ind(data[:4], data[4:])
        plot_changepoint(data, taustar=4, ttest=ttest, save="plots/", filename="session1")
        ```
    """
    n = len(data)
    fig = plt.figure(figsize=(25, 5))
    plt.plot(np.arange(1, n + 1), data)
    plt.axvline(x=taustar, color='r', label="tau: {}".format(taustar))

    if comparison_tau:
        # note: comparison_tau is expected to be an int type value from a previously computed set of cpt results
        plt.axvline(x=comparison_tau, color='y', label="other tau: {}".format(comparison_tau))

    pval_str = "P-value: {}".format(ttest[1])
    tstat_str = "T-statistic: {}".format(ttest[0])
    plt.plot([], [], "", label=pval_str)
    plt.plot([], [], "", label=tstat_str)
    plt.xlabel('Time [1 sec bins]')
    plt.ylabel('Neural Activity [spikes / bin]')
    plt.legend(prop={'size': 15})

    if save:
        plt.title(filename + ' change point statistics')

        plt.savefig(save + "{}_cpt.png".format(filename), bbox_inches='tight')
        plt.savefig(save + "{}_cpt.svg".format(filename), format='svg', bbox_inches='tight')

    plt.show()

stimulus_driven_annotation

movies

annotation_utils

Helpers for handling stimulus-driven annotations in an analysis workflows.

Provides utilities to split neural activity by label values for downstream analysis and visualization.

split_activity_by_value
split_activity_by_value(binned_activity, binned_label, specific_values=None)

Split binned activity by the values in a binned label vector.

Example

For a binary label (stimulus on/off), the binned_label vector contains 0/1. This function returns two arrays mapping to segments where the label is off/on, respectively. For multi-valued labels, activity is split per unique value.

Parameters:

Name Type Description Default
binned_activity ndarray

Binned neural activity (shape (N, ...)).

required
binned_label ndarray

Binned label aligned to the activity (length N).

required
specific_values Optional[Iterable[int]]

If provided, only these label values are used.

None

Returns:

Type Description
Dict[str, ndarray]

Dict[str, np.ndarray]: Mapping {value_name: activity_subset}.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/annotation_utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def split_activity_by_value(
    binned_activity: np.ndarray,
    binned_label: np.ndarray,
    specific_values: Optional[Iterable[int]] = None,
) -> Dict[str, np.ndarray]:
    """Split binned activity by the values in a binned label vector.

    Example:
        For a binary label (stimulus on/off), the ``binned_label`` vector
        contains 0/1. This function returns two arrays mapping to segments
        where the label is off/on, respectively. For multi-valued labels,
        activity is split per unique value.

    Args:
        binned_activity (np.ndarray): Binned neural activity (shape ``(N, ...)``).
        binned_label (np.ndarray): Binned label aligned to the activity (length ``N``).
        specific_values (Optional[Iterable[int]]): If provided, only these label values are used.

    Returns:
        Dict[str, np.ndarray]: Mapping ``{value_name: activity_subset}``.
    """
    # Set up for number --> word converstion 
    alph = inflect.engine()

    if not specific_values:
        values = np.unique(binned_label)

        ret_vectors = {}

        for value in values:

            indices = np.isin(binned_label, value)
            activity_from_val = binned_activity[indices]

            # Convert value number to the word, for easy referencing during analysis
            name = alph.number_to_words(int(value))

            ret_vectors[name] = activity_from_val

    if specific_values:

        ret_vectors = {}

        for value in specific_values:
            indices = np.isin(binned_label, value)
            activity_from_val = binned_activity[indices]

            name = alph.number_to_words(int(value))
            ret_vectors[name] = activity_from_val

    return ret_vectors
pause_handling

Helpers to handle pause intervals while binning labels and spikes.

pause_start_bin
pause_start_bin(bins, start)

Find start bin index inclusive of the pause start.

Parameters:

Name Type Description Default
bins ndarray or list - like

bin edges (ms).

required
start float

Pause start time (ms).

required

Returns: int: index of the start bin.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/pause_handling.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def pause_start_bin(bins: np.ndarray, start: float) -> int:
    """Find start bin index inclusive of the pause start.

    Args:
        bins (np.ndarray or list-like): bin edges (ms).
        start (float): Pause start time (ms).
    Returns:
        int: index of the start bin.
    """
    ind_start = (np.abs(bins - start)).argmin()  
    if bins[ind_start] > start: 
        start_bin = ind_start - 1
    else: 
        start_bin = ind_start
    return start_bin 
pause_stop_bin
pause_stop_bin(bins, stop)

Find stop bin index inclusive of the pause stop.

Parameters:

Name Type Description Default
bins ndarray

Bin edges (ms).

required
stop float

Pause stop time (ms).

required

Returns: int: Index of the stop bin.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/pause_handling.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def pause_stop_bin(bins: np.ndarray, stop: float) -> int: 
    """Find stop bin index inclusive of the pause stop.

    Args:
        bins: Bin edges (ms).
        stop: Pause stop time (ms).
    Returns:
        int: Index of the stop bin.
    """
    ind_stop = (np.abs(bins - stop)).argmin()
    if bins[ind_stop] < stop:
        stop_bin = ind_stop + 1
    else:
        stop_bin = ind_stop
    return stop_bin
make_pause_interval
make_pause_interval(bin_start, bin_stop)

Make a list of indices spanning the pause interval (inclusive).

Parameters:

Name Type Description Default
bin_start int

Start bin index.

required
bin_stop int

Stop bin index.

required

Returns: List[int]: List of indices spanning the pause interval (inclusive).

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/pause_handling.py
42
43
44
45
46
47
48
49
50
51
52
def make_pause_interval(bin_start: int, bin_stop: int) -> List[int]:
    """Make a list of indices spanning the pause interval (inclusive).

    Args:
        bin_start (int): Start bin index.
        bin_stop (int): Stop bin index.
    Returns:
        List[int]: List of indices spanning the pause interval (inclusive).
    """
    pause = list(range(bin_start, (bin_stop + 1), 1))
    return pause 
rm_pauses_bins
rm_pauses_bins(bins, start, stop, return_intervals=False)

Remove bin edges that occur during paused playback.

Parameters:

Name Type Description Default
bins ndarray or list - like

Bin edges (ms).

required
start ndarray or list - like

Pause starts (ms).

required
stop ndarray or list - like

Pause stops (ms).

required
return_intervals bool

If True, also return indices removed.

False

Returns: Tuple[np.ndarray, List[int]]: Cleaned bins or (bins_no_pauses, removed_indices).

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/pause_handling.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def rm_pauses_bins(
    bins: np.ndarray,
    start: np.ndarray,
    stop: np.ndarray,
    return_intervals: bool = False,
) -> np.ndarray | Tuple[np.ndarray, List[int]]:
    """Remove bin edges that occur during paused playback.

    Args:
        bins (np.ndarray or list-like): Bin edges (ms).
        start (np.ndarray or list-like): Pause starts (ms).
        stop (np.ndarray or list-like): Pause stops (ms).
        return_intervals (bool): If ``True``, also return indices removed.
    Returns: 
        Tuple[np.ndarray, List[int]]: Cleaned bins or ``(bins_no_pauses, removed_indices)``.
    """
    pauses = []

    for i in range(len(start)):
        start_bin = pause_start_bin(bins, start[i])
        stop_bin  = pause_stop_bin(bins, stop[i])
        interval = make_pause_interval(start_bin, stop_bin)
        pauses.append(interval)

    flatten = lambda l: [item for sublist in l for item in sublist]
    all_pauses = flatten(pauses)

    no_pauses = np.delete(bins, all_pauses)

    if return_intervals: 
        output = [no_pauses, all_pauses]
    else:
        output = no_pauses

    return output
rm_pauses_spikes
rm_pauses_spikes(unit, start, stop, return_intervals=False)

Remove spikes that occur during paused playback.

Parameters:

Name Type Description Default
unit ndarray

Spike times (ms).

required
start ndarray

Pause starts (ms).

required
stop ndarray

Pause stops (ms).

required
return_intervals bool

If True, also return removed indices.

False

Returns: Tuple[np.ndarray, List[int]]: Cleaned spikes or (unit_no_pauses, removed_indices).

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/pause_handling.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def rm_pauses_spikes(
    unit: np.ndarray,
    start: np.ndarray,
    stop: np.ndarray,
    return_intervals: bool = False,
) -> np.ndarray | Tuple[np.ndarray, List[int]]:
    """Remove spikes that occur during paused playback.

    Args:
        unit (np.ndarray): Spike times (ms).
        start (np.ndarray): Pause starts (ms).
        stop (np.ndarray): Pause stops (ms).
        return_intervals (bool): If ``True``, also return removed indices.
    Returns: 
        Tuple[np.ndarray, List[int]]: Cleaned spikes or ``(unit_no_pauses, removed_indices)``.
    """
    paused_spikes = []

    for i, spk in enumerate(unit): 
        for j in range(len(start)):
            if spk >= start[j] and spk <= stop[j]:
                paused_spikes.append(i)

    unit_no_pauses = np.delete(unit, paused_spikes)

    if return_intervals: 
        output = [unit_no_pauses, paused_spikes]
    else:
        output = unit_no_pauses

    return output
processing_labels
make_label_from_start_stop_times
make_label_from_start_stop_times(values, start_times, stop_times, ref_vec, default_value=0)

This function takes a vector with tuples with start and stop times and converts it to the default label

Parameters:

Name Type Description Default
ref_vec ndarray

reference vector, e.g. either PTS of movie or neural recording time of patient

required
default_value int

default value of label, which shall be added to all gaps in start stop times

0
values list

vector with all values

required
start_times list

vector with all start_times of segments

required
stop_times list

vector with all stop times of segments

required

Returns: list[int] | int: Label vector, or -1 on error.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/processing_labels.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def make_label_from_start_stop_times(values: Sequence[int],
    start_times: Sequence[float],
    stop_times: Sequence[float],
    ref_vec: Sequence[float] | np.ndarray,
    default_value: int = 0,
) -> list[int]:
    """
    This function takes a vector with tuples with start and stop times and converts it to the default label

    Args:
        ref_vec (np.ndarray): reference vector, e.g. either PTS of movie or neural recording time of patient
        default_value (int): default value of label, which shall be added to all gaps in start stop times
        values (list): vector with all values
        start_times (list): vector with all start_times of segments
        stop_times (list): vector with all stop times of segments
    Returns:
        list[int] | int: Label vector, or ``-1`` on error.
    """
    if not (len(values) == len(start_times) == len(stop_times)):
        print("vectors values, starts and stops have to be the same length")
        return -1

    default_label = [default_value] * len(ref_vec)

    for i in range(len(values)):
        start_index_in_default_vec = create_vectors_from_time_points.get_index_nearest_timestamp_in_vector(np.array(ref_vec), start_times[i])
        end_index_in_default_vec = create_vectors_from_time_points.get_index_nearest_timestamp_in_vector(np.array(ref_vec), stop_times[i])

        default_label[start_index_in_default_vec:(end_index_in_default_vec+1)] = \
            [int(values[i])]*(end_index_in_default_vec - start_index_in_default_vec + 1)

    return default_label
create_xml_for_advene
create_xml_for_advene(id_name, start_end_times_vector, label_name)

This function creates an XML string, which can be imported to the movie annotation tool Advene

Parameters:

Name Type Description Default
id_name str

name of ID in XML file

required
start_end_times_vector list

input vector that contains the start and end times of the label in milliseconds

required
label_name str

the name of the label how it shall be displayed in the GUI of Advene

required

Returns:

Name Type Description
str str

an XML string that can be copied to the content.xml file and loaded to Advene

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/processing_labels.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def create_xml_for_advene(id_name: str, start_end_times_vector: list[tuple[float, float]], label_name: str) -> str:
    """
    This function creates an XML string, which can be imported to the movie annotation tool Advene

    Args:
        id_name (str): name of ID in XML file
        start_end_times_vector (list): input vector that contains the start and end times of the label in milliseconds
        label_name (str): the name of the label how it shall be displayed in the GUI of Advene

    Returns:
        str: an XML string that can be copied to the content.xml file and loaded to Advene
    """
    new_annotations = ""
    id_ = 0

    for start, end in start_end_times_vector:
        string_new_annotation = '<annotation id="{}{}" type="#{}"><millisecond-fragment begin="{}" end="{}"/><content>num=1</content></annotation>'.format(
            id_name, id_, label_name, int(start * 1000), int(end * 1000))

        new_annotations += string_new_annotation

        id_ += 1
    return new_annotations
start_stop_values_from_json
start_stop_values_from_json(path_to_file, label_name)

This function extracts start times, stop times and values of all segments of a label from a json file

Parameters:

Name Type Description Default
path_to_file str

path to json file

required
label_name str

name of label (how it was specified in json file)

required

Returns: np.ndarray, np.ndarray, np.ndarray: first array: values of label, second array: start times of label segments in seconds, third array: stop times of label segments in seconds

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/processing_labels.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def start_stop_values_from_json(path_to_file: str, label_name: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    This function extracts start times, stop times and values of all segments of a label from a json file

    Args:
        path_to_file (str): path to json file
        label_name (str): name of label (how it was specified in json file)
    Returns:
        np.ndarray, np.ndarray, np.ndarray: first array: values of label, second array: start times of label segments in seconds, third array: stop times of label segments in seconds
    """
    # open and read json file
    with open(path_to_file,'r') as jsonfile:
        labels_json_file = json.load(jsonfile) 

    start_times = []
    stop_times = []
    values = []
    # iterate through elements in json file to extract time points of segments
    for annotation in labels_json_file.get("annotations"):
        if annotation.get("type") == label_name:
            start_times.append(annotation.get("begin"))
            stop_times.append(annotation.get("end"))
            values.append(annotation.get("title"))

    return np.array(values), np.array(start_times)/1000, np.array(stop_times)/1000
export_labels_from_json_file
export_labels_from_json_file(path_to_file, label_name, bool_save_start_end_times)

Process a json file from Advene and create a new label.

Parameters:

Name Type Description Default
path_to_file str

path to json file containing all information about the labels from Advene

required
label_name str

name that was specified in Advene

required
bool_save_start_end_times bool

determine whether the start and end times should be saved as a npy file

required

Returns: list: new label, aligned with movie (default label)

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/processing_labels.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def export_labels_from_json_file(path_to_file: str, label_name: str, bool_save_start_end_times: bool) -> list[int] | int:
    """
    Process a json file from Advene and create a new label.

    Args:
        path_to_file (str): path to json file containing all information about the labels from Advene
        label_name (str): name that was specified in Advene
        bool_save_start_end_times (bool): determine whether the start and end times should be saved as a npy file
    Returns:
        list: new label, aligned with movie (default label)
    """
    with open(path_to_file, 'r') as jsonfile:
        labels_json_file = json.load(jsonfile)

    label_start_end_times = []
    values = []
    for annotation in labels_json_file.get("annotations"):
        if annotation.get("type") == label_name:
            label_start_end_times.append([annotation.get("begin"), annotation.get("end")])
            values.append(annotation.get("title"))

    # if requested, the start and end times will be saved
    if bool_save_start_end_times:
        start_end_times_seconds = [[x / 1000, y / 1000] for [x, y] in label_start_end_times]
        np.save("../useful_data/start_end_times/start_end_times_{}.npy".format(label_name), start_end_times_seconds)

    return label_start_end_times, values
get_start_stop_times_from_label
get_start_stop_times_from_label(neural_rec_time, patient_aligned_label)

This function takes the patient aligned label and extracts the start and stop times from that.

Parameters:

Name Type Description Default
neural_rec_time array

neural recording time of patient

required
patient_aligned_label array

patient aligned label

required

Returns: values (list), start times (list) and stop times (list) of label segments

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/processing_labels.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_start_stop_times_from_label(neural_rec_time: np.ndarray, patient_aligned_label: np.ndarray) -> tuple[list, list, list]:
    """
    This function takes the patient aligned label and extracts the start and stop times from that.

    Args:
        neural_rec_time (array): neural recording time of patient
        patient_aligned_label (array): patient aligned label
    Returns:
        values (list), start times (list) and stop times (list) of label segments
    """

    tmp = patient_aligned_label[0]
    values = [tmp]
    start_times = [neural_rec_time[0]]
    stop_times = []
    for i in range(1, len(patient_aligned_label)):
        if not patient_aligned_label[i] == tmp:
            values.append(patient_aligned_label[i])
            start_times.append(neural_rec_time[i])
            stop_times.append(neural_rec_time[i-1])
            tmp = patient_aligned_label[i]
    stop_times.append(neural_rec_time[-1])

    return values, start_times, stop_times
watch_log

Watch log parsing utilities for extracting PTS and CPU timestamps.

This module provides the :class:WatchLog class to read a "watch log" file, derive start/end times, and build aligned arrays/dataframes of presentation timestamps (PTS) and real (CPU) times. It also includes helpers to trim the recorded time series to a maximum movie duration.

Constants

MAX_MOVIE_TIME (int): Maximum allowable movie time (in the same units as PTS) used to filter out timestamps beyond the movie length.

Example

wl = WatchLog("/path/to/watch.log") wl.get_start_time(), wl.get_end_time() (12, 5012) wl.df_pts_cpu.head()

pts cpu_time
0 0.00 1234
1 0.04 1270
WatchLog

Parse a watch log file and process the concurrent presentation time stamps (PTS) and the local PC (CPU) time series.

On initialization, this class: 1) Reads the provided watch log file. 2) Extracts the start and end CPU timestamps. 3) Loads all PTS/CPU rows, trims them to the maximum movie length, and converts CPU timestamps to seconds (integer, via floor division by 1000). 4) Builds a pandas DataFrame (:attr:df_pts_cpu) with pts and cpu_time.

Attributes:

Name Type Description
watch_log_file str

Absolute or relative path to the watch log file.

start_time int

Start time in seconds (CPU time derived from the file).

end_time int

End time in seconds (CPU time derived from the file).

duration int

Duration in seconds, computed as end_time - start_time.

pts_time_stamps ndarray

Array of PTS values (floats), possibly trimmed.

dts_time_stamps ndarray

Array of CPU times in seconds (ints), possibly trimmed.

excluded_indices list[int]

Indices removed when trimming to MAX_MOVIE_TIME.

df_pts_cpu DataFrame

Two-column DataFrame with pts and cpu_time.

Notes
  • The method :meth:getlines reads the log file in binary mode and returns a list of byte strings. Downstream parsing assumes whitespace- separated fields and converts to numeric types as needed.
  • CPU timestamps are divided by 1000 and cast to int, so any sub-second resolution is truncated rather than rounded.
Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class WatchLog:
    """
    Parse a watch log file and process the concurrent presentation time stamps (PTS) and the local PC (CPU) time series.

    On initialization, this class:
      1) Reads the provided watch log file.
      2) Extracts the start and end CPU timestamps.
      3) Loads all PTS/CPU rows, trims them to the maximum movie length, and
         converts CPU timestamps to seconds (integer, via floor division by 1000).
      4) Builds a pandas DataFrame (:attr:`df_pts_cpu`) with `pts` and `cpu_time`.

    Attributes:
        watch_log_file (str): Absolute or relative path to the watch log file.
        start_time (int): Start time in seconds (CPU time derived from the file).
        end_time (int): End time in seconds (CPU time derived from the file).
        duration (int): Duration in seconds, computed as `end_time - start_time`.
        pts_time_stamps (np.ndarray): Array of PTS values (floats), possibly trimmed.
        dts_time_stamps (np.ndarray): Array of CPU times in seconds (ints), possibly trimmed.
        excluded_indices (list[int]): Indices removed when trimming to `MAX_MOVIE_TIME`.
        df_pts_cpu (pd.DataFrame): Two-column DataFrame with `pts` and `cpu_time`.

    Notes:
        - The method :meth:`getlines` reads the log file in **binary** mode and
          returns a list of byte strings. Downstream parsing assumes whitespace-
          separated fields and converts to numeric types as needed.
        - CPU timestamps are divided by 1000 and cast to `int`, so any sub-second
          resolution is truncated rather than rounded.
    """
    def __init__(self, path_watch_log: str):
        """
        Initialize the WatchLog and populate derived fields.

        Args:
            path_watch_log (str): Path to the watch log file to load.

        Side Effects:
            - Reads the file at `path_watch_log`.
            - Populates attributes documented in the class docstring.
        """
        self.watch_log_file = path_watch_log
        self.start_time, self.end_time = self.extract_start_and_end_time()
        self.duration = self.end_time - self.start_time
        self.pts_time_stamps, self.dts_time_stamps, self.excluded_indices = self.get_times_from_watch_log(path_watch_log)
        # optionally divide time stamp by 1000 to get time in seconds
        self.dts_time_stamps = np.array([int(x / 1000) for x in self.dts_time_stamps])
        self.df_pts_cpu = pd.DataFrame({"pts": self.pts_time_stamps, "cpu_time": self.dts_time_stamps})

        self.df_pts_cpu.sort_values(['cpu_time'])

    def get_start_time(self) -> int:
        """
        Return the start CPU time (in seconds).

        Returns:
            int: Start time in seconds.
        """
        return self.start_time

    def get_end_time(self) -> int:
        """
        Return the end CPU time (in seconds).

        Returns:
            int: End time in seconds.
        """
        return self.end_time

    def _set_start_time(self, new_start_time: int):
        """
        Set a new start time (in seconds).

        Args:
            new_start_time (int): The new start time in seconds.
        """
        self.start_time = new_start_time

    def _set_end_time(self, new_end_time: int):
        """
        Set a new end time (in seconds).

        Args:
            new_end_time (int): The new end time in seconds.
        """
        self.end_time = new_end_time

    def extract_start_and_end_time(self) -> tuple[int, int]:
        """
        Extract the start and end CPU timestamps from the watch log.

        The function reads the file in text mode, takes the second line as the
        "first" data line, and scans backward from the end to find the last line
        beginning with ``"pts"``. It returns the CPU timestamps from those two
        lines, converted to seconds by dividing by 1000 and casting to `int`.

        Returns:
            tuple[int, int]: A `(start_time_s, end_time_s)` tuple in seconds.

        Raises:
            FileNotFoundError: If the watch log file cannot be opened.
            ValueError: If the file does not contain expected fields/format.
        """
        with open(self.watch_log_file, 'r') as f:
            lines = f.read().splitlines()
            first_line = lines[1]
            i = len(lines)-1
            while not lines[i].startswith("pts"):
                i -= 1
            last_line = lines[i]  # TODO change this to make it generally applicable

        # return cpu time stamp of first and last line in watch log
        # divide by 1000 to get seconds
        return int(int(first_line.split()[-1]) / 1000), int(int(last_line.split()[-1]) / 1000)

    def get_times_from_watch_log(self, path_watch_log: str) -> tuple[np.ndarray, np.ndarray, list[int]]:
        """
        Extract PTS and CPU (real) times from the watch log.

        The function reads raw lines via :meth:`getlines`, parses the whitespace-
        separated fields, and collects two arrays:
        - PTS values as floats rounded to 2 decimals.
        - CPU timestamps as integers (original units, **not** yet divided by 1000).

        It then trims both arrays to the maximum movie duration via
        :meth:`cut_time_to_movie_pts`.

        Args:
            path_watch_log (str): Path to the watch log file to parse.

        Returns:
            tuple[np.ndarray, np.ndarray, list[int]]: A tuple
            ``(pts_time_stamps, cpu_time_stamps, excluded_indices)`` where
            - `pts_time_stamps` is a float array,
            - `cpu_time_stamps` is an int array (original unit),
            - `excluded_indices` lists indices removed due to `MAX_MOVIE_TIME`.

        Raises:
            FileNotFoundError: If the watch log file cannot be opened.
            ValueError: If the log lines do not match the expected 4-field format.
        """
        lines = self.getlines(path_watch_log)
        pts = []
        time = []

        for line in lines[1:]:
            fields = line.split()
            if len(fields) == 4:
                pts.append(round(float(fields[1]), 2))
                time.append(int(fields[3]))

        pts = np.array(pts)
        time = np.array(time)

        return self.cut_time_to_movie_pts(pts, time)

    @staticmethod
    def cut_time_to_movie_pts(pts_time_stamps: np.ndarray, cpu_time_stamps: np.ndarray) -> tuple[np.ndarray, np.ndarray, list[int]]:
        """
        Trim PTS and CPU arrays to the maximum movie length.

        Any PTS value strictly greater than :data:`MAX_MOVIE_TIME` is excluded.
        The function returns aligned arrays of the retained elements and the
        list of excluded indices.

        Args:
            pts_time_stamps (np.ndarray): Array of PTS values (floats).
            cpu_time_stamps (np.ndarray): Array of CPU times (ints), aligned with PTS.

        Returns:
            tuple[np.ndarray, np.ndarray, list[int]]: A tuple
            ``(cut_down_pts, cut_down_dts, excluded_indices)``:
            - `cut_down_pts` (np.ndarray): PTS values ≤ `MAX_MOVIE_TIME`.
            - `cut_down_dts` (np.ndarray): Corresponding CPU times.
            - `excluded_indices` (list[int]): Indices removed from the original arrays.

        Notes:
            - This function assumes `pts_time_stamps` and `cpu_time_stamps` are the
              same length and aligned 1:1.
        """
        excluded_time_points_based_on_max_movie_time = [0 if x > MAX_MOVIE_TIME else 1 for x in pts_time_stamps]

        cut_down_pts = []
        cut_down_dts = []
        excluded_indices = []
        for i in range(0, len(pts_time_stamps)):
            if excluded_time_points_based_on_max_movie_time[i] == 1:
                cut_down_pts.append(pts_time_stamps[i])
                cut_down_dts.append(cpu_time_stamps[i])
            else:
                excluded_indices.append(i)

        return np.array(cut_down_pts), np.array(cut_down_dts), excluded_indices

    @staticmethod
    def getlines(filename: str) -> list[bytes]:
        """
        Read a file in binary mode and return its lines.

        Args:
            filename (str): Path to the file to read.

        Returns:
            list[bytes]: Lines of the file as byte strings (no newline characters).

        Raises:
            FileNotFoundError: If the file cannot be opened.
        """
        with open(filename, 'rb') as logfile:
            data = logfile.read()
        lines = data.splitlines()
        return lines
get_start_time
get_start_time()

Return the start CPU time (in seconds).

Returns:

Name Type Description
int int

Start time in seconds.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
76
77
78
79
80
81
82
83
def get_start_time(self) -> int:
    """
    Return the start CPU time (in seconds).

    Returns:
        int: Start time in seconds.
    """
    return self.start_time
get_end_time
get_end_time()

Return the end CPU time (in seconds).

Returns:

Name Type Description
int int

End time in seconds.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
85
86
87
88
89
90
91
92
def get_end_time(self) -> int:
    """
    Return the end CPU time (in seconds).

    Returns:
        int: End time in seconds.
    """
    return self.end_time
extract_start_and_end_time
extract_start_and_end_time()

Extract the start and end CPU timestamps from the watch log.

The function reads the file in text mode, takes the second line as the "first" data line, and scans backward from the end to find the last line beginning with "pts". It returns the CPU timestamps from those two lines, converted to seconds by dividing by 1000 and casting to int.

Returns:

Type Description
tuple[int, int]

tuple[int, int]: A (start_time_s, end_time_s) tuple in seconds.

Raises:

Type Description
FileNotFoundError

If the watch log file cannot be opened.

ValueError

If the file does not contain expected fields/format.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def extract_start_and_end_time(self) -> tuple[int, int]:
    """
    Extract the start and end CPU timestamps from the watch log.

    The function reads the file in text mode, takes the second line as the
    "first" data line, and scans backward from the end to find the last line
    beginning with ``"pts"``. It returns the CPU timestamps from those two
    lines, converted to seconds by dividing by 1000 and casting to `int`.

    Returns:
        tuple[int, int]: A `(start_time_s, end_time_s)` tuple in seconds.

    Raises:
        FileNotFoundError: If the watch log file cannot be opened.
        ValueError: If the file does not contain expected fields/format.
    """
    with open(self.watch_log_file, 'r') as f:
        lines = f.read().splitlines()
        first_line = lines[1]
        i = len(lines)-1
        while not lines[i].startswith("pts"):
            i -= 1
        last_line = lines[i]  # TODO change this to make it generally applicable

    # return cpu time stamp of first and last line in watch log
    # divide by 1000 to get seconds
    return int(int(first_line.split()[-1]) / 1000), int(int(last_line.split()[-1]) / 1000)
get_times_from_watch_log
get_times_from_watch_log(path_watch_log)

Extract PTS and CPU (real) times from the watch log.

The function reads raw lines via :meth:getlines, parses the whitespace- separated fields, and collects two arrays: - PTS values as floats rounded to 2 decimals. - CPU timestamps as integers (original units, not yet divided by 1000).

It then trims both arrays to the maximum movie duration via :meth:cut_time_to_movie_pts.

Parameters:

Name Type Description Default
path_watch_log str

Path to the watch log file to parse.

required

Returns:

Type Description
ndarray

tuple[np.ndarray, np.ndarray, list[int]]: A tuple

ndarray

(pts_time_stamps, cpu_time_stamps, excluded_indices) where

list[int]
  • pts_time_stamps is a float array,
tuple[ndarray, ndarray, list[int]]
  • cpu_time_stamps is an int array (original unit),
tuple[ndarray, ndarray, list[int]]
  • excluded_indices lists indices removed due to MAX_MOVIE_TIME.

Raises:

Type Description
FileNotFoundError

If the watch log file cannot be opened.

ValueError

If the log lines do not match the expected 4-field format.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def get_times_from_watch_log(self, path_watch_log: str) -> tuple[np.ndarray, np.ndarray, list[int]]:
    """
    Extract PTS and CPU (real) times from the watch log.

    The function reads raw lines via :meth:`getlines`, parses the whitespace-
    separated fields, and collects two arrays:
    - PTS values as floats rounded to 2 decimals.
    - CPU timestamps as integers (original units, **not** yet divided by 1000).

    It then trims both arrays to the maximum movie duration via
    :meth:`cut_time_to_movie_pts`.

    Args:
        path_watch_log (str): Path to the watch log file to parse.

    Returns:
        tuple[np.ndarray, np.ndarray, list[int]]: A tuple
        ``(pts_time_stamps, cpu_time_stamps, excluded_indices)`` where
        - `pts_time_stamps` is a float array,
        - `cpu_time_stamps` is an int array (original unit),
        - `excluded_indices` lists indices removed due to `MAX_MOVIE_TIME`.

    Raises:
        FileNotFoundError: If the watch log file cannot be opened.
        ValueError: If the log lines do not match the expected 4-field format.
    """
    lines = self.getlines(path_watch_log)
    pts = []
    time = []

    for line in lines[1:]:
        fields = line.split()
        if len(fields) == 4:
            pts.append(round(float(fields[1]), 2))
            time.append(int(fields[3]))

    pts = np.array(pts)
    time = np.array(time)

    return self.cut_time_to_movie_pts(pts, time)
cut_time_to_movie_pts staticmethod
cut_time_to_movie_pts(pts_time_stamps, cpu_time_stamps)

Trim PTS and CPU arrays to the maximum movie length.

Any PTS value strictly greater than :data:MAX_MOVIE_TIME is excluded. The function returns aligned arrays of the retained elements and the list of excluded indices.

Parameters:

Name Type Description Default
pts_time_stamps ndarray

Array of PTS values (floats).

required
cpu_time_stamps ndarray

Array of CPU times (ints), aligned with PTS.

required

Returns:

Type Description
ndarray

tuple[np.ndarray, np.ndarray, list[int]]: A tuple

ndarray

(cut_down_pts, cut_down_dts, excluded_indices):

list[int]
  • cut_down_pts (np.ndarray): PTS values ≤ MAX_MOVIE_TIME.
tuple[ndarray, ndarray, list[int]]
  • cut_down_dts (np.ndarray): Corresponding CPU times.
tuple[ndarray, ndarray, list[int]]
  • excluded_indices (list[int]): Indices removed from the original arrays.
Notes
  • This function assumes pts_time_stamps and cpu_time_stamps are the same length and aligned 1:1.
Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@staticmethod
def cut_time_to_movie_pts(pts_time_stamps: np.ndarray, cpu_time_stamps: np.ndarray) -> tuple[np.ndarray, np.ndarray, list[int]]:
    """
    Trim PTS and CPU arrays to the maximum movie length.

    Any PTS value strictly greater than :data:`MAX_MOVIE_TIME` is excluded.
    The function returns aligned arrays of the retained elements and the
    list of excluded indices.

    Args:
        pts_time_stamps (np.ndarray): Array of PTS values (floats).
        cpu_time_stamps (np.ndarray): Array of CPU times (ints), aligned with PTS.

    Returns:
        tuple[np.ndarray, np.ndarray, list[int]]: A tuple
        ``(cut_down_pts, cut_down_dts, excluded_indices)``:
        - `cut_down_pts` (np.ndarray): PTS values ≤ `MAX_MOVIE_TIME`.
        - `cut_down_dts` (np.ndarray): Corresponding CPU times.
        - `excluded_indices` (list[int]): Indices removed from the original arrays.

    Notes:
        - This function assumes `pts_time_stamps` and `cpu_time_stamps` are the
          same length and aligned 1:1.
    """
    excluded_time_points_based_on_max_movie_time = [0 if x > MAX_MOVIE_TIME else 1 for x in pts_time_stamps]

    cut_down_pts = []
    cut_down_dts = []
    excluded_indices = []
    for i in range(0, len(pts_time_stamps)):
        if excluded_time_points_based_on_max_movie_time[i] == 1:
            cut_down_pts.append(pts_time_stamps[i])
            cut_down_dts.append(cpu_time_stamps[i])
        else:
            excluded_indices.append(i)

    return np.array(cut_down_pts), np.array(cut_down_dts), excluded_indices
getlines staticmethod
getlines(filename)

Read a file in binary mode and return its lines.

Parameters:

Name Type Description Default
filename str

Path to the file to read.

required

Returns:

Type Description
list[bytes]

list[bytes]: Lines of the file as byte strings (no newline characters).

Raises:

Type Description
FileNotFoundError

If the file cannot be opened.

Source code in epiphyte/preprocessing/annotation/stimulus_driven_annotation/movies/watch_log.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
def getlines(filename: str) -> list[bytes]:
    """
    Read a file in binary mode and return its lines.

    Args:
        filename (str): Path to the file to read.

    Returns:
        list[bytes]: Lines of the file as byte strings (no newline characters).

    Raises:
        FileNotFoundError: If the file cannot be opened.
    """
    with open(filename, 'rb') as logfile:
        data = logfile.read()
    lines = data.splitlines()
    return lines

data_preprocessing

binning

Binning functions for spike times and labels. Pulls data from the database using pre-defined query functions.

bin_label

bin_label(patient_id, session_nr, values, start_times, stop_times, bin_size, exclude_pauses)

Bin a label timeline against fixed-size bins.

Parameters:

Name Type Description Default
patient_id int

ID of patient.

required
session_nr int

Session number for the movie watching.

required
values ndarray

Values of the label per segment.

required
start_times ndarray

Start times (ms) per segment.

required
stop_times ndarray

Stop times (ms) per segment.

required
bin_size int

Size of one bin in milliseconds.

required
exclude_pauses bool

If True, exclude paused playback intervals.

required

Returns:

Type Description
ndarray

np.ndarray: Indicator vector (one value per bin).

Source code in epiphyte/preprocessing/data_preprocessing/binning.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def bin_label(
    patient_id: int,
    session_nr: int,
    values: np.ndarray,
    start_times: np.ndarray,
    stop_times: np.ndarray,
    bin_size: int,
    exclude_pauses: bool,
) -> np.ndarray:
    """Bin a label timeline against fixed-size bins.

    Args:
        patient_id (int): ID of patient.
        session_nr (int): Session number for the movie watching.
        values (np.ndarray): Values of the label per segment.
        start_times (np.ndarray): Start times (ms) per segment.
        stop_times (np.ndarray): Stop times (ms) per segment.
        bin_size (int): Size of one bin in milliseconds.
        exclude_pauses (bool): If ``True``, exclude paused playback intervals.

    Returns:
        np.ndarray: Indicator vector (one value per bin).
    """
    neural_rec_time = get_neural_rectime_of_patient(patient_id, session_nr) / 1000

    rec_on = neural_rec_time[0]
    rec_off = neural_rec_time[-1]
    total_msec = rec_off - rec_on
    total_bins = int(total_msec / bin_size)
    bins = np.linspace(rec_on, rec_off, total_bins)

    if exclude_pauses:  
        start_times_pauses, stop_times_pauses = get_start_stop_times_pauses(patient_id, session_nr)
        bins_no_pauses = pause_handling.rm_pauses_bins(bins, start_times_pauses, stop_times_pauses)
        reference_vector = bins_no_pauses
    else:
        reference_vector = bins

    if os.path.exists("neural_rec_time.npy"):
        os.remove("neural_rec_time.npy")

    return create_vectors_from_time_points.create_vector_from_start_stop_times_reference(reference_vector,
                                                                                         np.array(values),
                                                                                         np.array(start_times),
                                                                                         np.array(stop_times))

bin_spikes

bin_spikes(patient_id, session_nr, spike_times, bin_size, exclude_pauses, output_edges=False)

Bin spike times into fixed-size bins.

Parameters:

Name Type Description Default
patient_id int

ID of the patient.

required
session_nr int

Session number of the experiment.

required
spike_times ndarray

Spike timestamps (ms) as a vector.

required
bin_size int

Bin size in milliseconds.

required
exclude_pauses bool

If True, exclude paused playback intervals.

required
output_edges bool

If True, also return the bin edges used.

False

Returns:

Type Description
Union[ndarray, List[ndarray]]

Union[np.ndarray, List[np.ndarray]]: Binned spikes or [binned_spikes, bin_edges] if requested.

Source code in epiphyte/preprocessing/data_preprocessing/binning.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def bin_spikes(
    patient_id: int,
    session_nr: int,
    spike_times: np.ndarray,
    bin_size: int,
    exclude_pauses: bool,
    output_edges: bool = False,
) -> Union[np.ndarray, List[np.ndarray]]:
    """Bin spike times into fixed-size bins.

    Args:
        patient_id (int): ID of the patient.
        session_nr (int): Session number of the experiment.
        spike_times (np.ndarray): Spike timestamps (ms) as a vector.
        bin_size (int): Bin size in milliseconds.
        exclude_pauses (bool): If ``True``, exclude paused playback intervals.
        output_edges (bool): If ``True``, also return the bin edges used.

    Returns:
        Union[np.ndarray, List[np.ndarray]]: Binned spikes or ``[binned_spikes, bin_edges]`` if requested.
    """
    rectime = get_neural_rectime_of_patient(patient_id, session_nr) / 1000
    rec_on = rectime[0]
    rec_off = rectime[-1]

    total_msec = rec_off - rec_on
    total_bins = int(total_msec / bin_size)
    bins = np.linspace(rec_on, rec_off, total_bins)

    if exclude_pauses:
        start_times_pauses, stop_times_pauses = get_start_stop_times_pauses(patient_id, session_nr)

        # rescale pauses from microseconds to milliseconds
        start_times_pauses = start_times_pauses / 1000
        stop_times_pauses = stop_times_pauses / 1000

        # remove the pauses from the binning edges
        bins_no_pauses = pause_handling.rm_pauses_bins(bins, start_times_pauses, stop_times_pauses)
        unit_no_pauses, pause_spks = pause_handling.rm_pauses_spikes(spike_times, start_times_pauses, stop_times_pauses,
                                                                     return_intervals=True)
        # bin spikes
        binned_spikes, _ = np.histogram(unit_no_pauses, bins=bins_no_pauses)

        # output updated to bins without pause
        bins = bins_no_pauses

    else:
        # bin spikes
        binned_spikes, _ = np.histogram(spike_times, bins=bins)

    if output_edges:
        ret = [binned_spikes, bins]
    else: 
        ret = binned_spikes

    return ret

create_vectors_from_time_points

Functions related to processing the db stored time points (start/stop/values) into vectors for use in analysis.

get_index_nearest_timestamp_in_vector

get_index_nearest_timestamp_in_vector(vector, timestamp)

Finds the index of the value in a vector that is nearest to a given timestamp.

Parameters:

Name Type Description Default
vector ndarray

The array of timestamps to search.

required
timestamp float

The target timestamp to find the nearest value to.

required

Returns: int: The index of the value in vector that is closest to timestamp.

Example
vector = np.array([1.0, 2.5, 3.8, 5.0])
idx = get_index_nearest_timestamp_in_vector(vector, 3.0)
# idx == 1
Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def get_index_nearest_timestamp_in_vector(vector: np.ndarray, timestamp: float) -> int:
    """Finds the index of the value in a vector that is nearest to a given timestamp.

    Args:
        vector (np.ndarray): The array of timestamps to search.
        timestamp (float): The target timestamp to find the nearest value to.
    Returns:
        int: The index of the value in `vector` that is closest to `timestamp`.

    Example:
        ```python
        vector = np.array([1.0, 2.5, 3.8, 5.0])
        idx = get_index_nearest_timestamp_in_vector(vector, 3.0)
        # idx == 1
        ```
    """
    return (np.abs(np.array(vector) - timestamp)).argmin()

get_nearest_value_from_vector

get_nearest_value_from_vector(vector, timestamp)

Finds the value in a vector closest to a given timestamp.

Parameters:

Name Type Description Default
vector ndarray

Array of values to search.

required
timestamp float

Target timestamp to find the nearest value to.

required

Returns:

Name Type Description
float float

The value from the vector that is closest to the given timestamp.

Example
import numpy as np
vector = np.array([1.0, 2.5, 3.8, 5.0])
get_nearest_value_from_vector(vector, 4.0)
3.8
Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def get_nearest_value_from_vector(vector: np.ndarray, timestamp: float) -> float:
    """Finds the value in a vector closest to a given timestamp.

    Args:
        vector (np.ndarray): Array of values to search.
        timestamp (float): Target timestamp to find the nearest value to.

    Returns:
        float: The value from the vector that is closest to the given timestamp.

    Example:
        ```python
        import numpy as np
        vector = np.array([1.0, 2.5, 3.8, 5.0])
        get_nearest_value_from_vector(vector, 4.0)
        3.8
        ```
    """
    return vector[(np.abs(np.array(vector) - (timestamp))).argmin()]  # row number with matching pts

create_vector_from_start_stop_times_reference_cont_watch

create_vector_from_start_stop_times_reference_cont_watch(reference_vector, values, starts, stops)

Creates a vector aligned to a reference vector using provided start and stop times and corresponding values.

This function generates an indicator vector where each segment, defined by its start and stop times, is filled with the associated value. The output vector is aligned to the bins defined by the reference vector, which represents the edges of the bins.

Parameters:

Name Type Description Default
reference_vector ndarray

Array of timestamps or bin edges to which the output vector will be aligned.

required
values ndarray

Array of values to assign to each segment.

required
starts ndarray

Array of start times for each segment.

required
stops ndarray

Array of stop times for each segment.

required

Returns:

Type Description
ndarray

np.ndarray: Indicator vector aligned to the reference vector, with values assigned according to the specified intervals.

Notes

Prints an error and returns -1 if the lengths of values, starts, and stops do not match.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def create_vector_from_start_stop_times_reference_cont_watch(
    reference_vector: np.ndarray,
    values: np.ndarray,
    starts: np.ndarray,
    stops: np.ndarray,
) -> np.ndarray:
    """
    Creates a vector aligned to a reference vector using provided start and stop times and corresponding values.

    This function generates an indicator vector where each segment, defined by its start and stop times, is filled with the associated value. The output vector is aligned to the bins defined by the reference vector, which represents the edges of the bins.

    Args:
        reference_vector (np.ndarray): 
            Array of timestamps or bin edges to which the output vector will be aligned.
        values (np.ndarray): 
            Array of values to assign to each segment.
        starts (np.ndarray): 
            Array of start times for each segment.
        stops (np.ndarray): 
            Array of stop times for each segment.

    Returns:
        np.ndarray: 
            Indicator vector aligned to the reference vector, with values assigned according to the specified intervals.

    Notes:
        Prints an error and returns -1 if the lengths of `values`, `starts`, and `stops` do not match.
    """
    # check if input has the correct format
    if not (len(values) == len(starts) == len(stops)):
        print("vectors values, starts and stops have to be the same length")
        return -1

    nr_intervals = len(values)
    ret = []
    for i in range(0, nr_intervals):
        index_dts_start = get_index_nearest_timestamp_in_vector(reference_vector, starts[i])
        index_dts_stop = get_index_nearest_timestamp_in_vector(reference_vector, stops[i])
        length_interval = index_dts_stop - index_dts_start
        if length_interval == 0:
            ret = np.append(ret, [values[i]])
        else:
            ret = np.append(ret, [values[i]] * (length_interval))

    return ret

create_vector_from_start_stop_times

create_vector_from_start_stop_times(patient_id, session_nr, values, starts, stops)

Less powerful version of the function create_vector_from_start_stop_times_reference_cont_watch

Parameters:

Name Type Description Default
patient_id int

ID of patient

required
session_nr int

unique number of session of patient

required
values ndarray

array indicating all values in the right order

required
starts ndarray

array indicating all start times of all segments in the right order

required
stops ndarray

array indicating all stop times of all segments as a vector in the right order

required

Returns:

Type Description
ndarray

np.ndarray: Indicator function aligned to reference vector.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def create_vector_from_start_stop_times(
    patient_id: int,
    session_nr: int,
    values: np.ndarray,
    starts: np.ndarray,
    stops: np.ndarray,
) -> np.ndarray:
    """
    Less powerful version of the function create_vector_from_start_stop_times_reference_cont_watch

    Args:
        patient_id (int): ID of patient
        session_nr (int): unique number of session of patient
        values (np.ndarray): array indicating all values in the right order
        starts (np.ndarray): array indicating all start times of all segments in the right order
        stops (np.ndarray): array indicating all stop times of all segments as a vector in the right order

    Returns:
        np.ndarray: Indicator function aligned to reference vector.

    """
    neural_rec_time = get_neural_rectime_of_patient(patient_id, session_nr)

    # check if input has the correct format
    if not (len(values) == len(starts) == len(stops)):
        print("vectors values, starts and stops have to be the same length")
        return -1

    nr_intervals = len(values)
    ret = []
    for i in range(0, nr_intervals):
        index_dts_start = get_index_nearest_timestamp_in_vector(neural_rec_time, starts[i])
        index_dts_stop = get_index_nearest_timestamp_in_vector(neural_rec_time, stops[i])
        length_interval = len(neural_rec_time[index_dts_start:index_dts_stop + 1])
        ret = np.append(ret, [values[i]] * length_interval)

    return ret

get_start_stop_times_from_label

get_start_stop_times_from_label(neural_rec_time, patient_aligned_label)

This function extracts the start and stop times from a label. patient_aligned_label has to have the same length as neural_rec_time The time points in the resulting vectors are in neural recording time

Parameters:

Name Type Description Default
neural_rec_time ndarray

array indicating neural recording time

required
patient_aligned_label ndarray

array indicating label aligned to patient time

required

Returns:

Name Type Description
tuple tuple[list, list, list]

(values, start_times, stop_times) arrays.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_start_stop_times_from_label(
    neural_rec_time: np.ndarray, patient_aligned_label: np.ndarray
) -> tuple[list, list, list]:
    """
    This function extracts the start and stop times from a label.
    `patient_aligned_label` has to have the same length as `neural_rec_time`
    The time points in the resulting vectors are in neural recording time

    Args:
        neural_rec_time (np.ndarray): array indicating neural recording time
        patient_aligned_label (np.ndarray): array indicating label aligned to patient time

    Returns:
        tuple: ``(values, start_times, stop_times)`` arrays.
    """
    tmp = patient_aligned_label[0]
    values = [tmp]
    start_times = [neural_rec_time[0]]
    stop_times = []
    for i in range(1, len(patient_aligned_label)):
        if not patient_aligned_label[i] == tmp:
            values.append(patient_aligned_label[i])
            start_times.append(neural_rec_time[i])
            stop_times.append(neural_rec_time[i - 1])
            tmp = patient_aligned_label[i]
    stop_times.append(neural_rec_time[-1])

    return values, start_times, stop_times

get_bins_excl_pauses

get_bins_excl_pauses(patient_id, session_nr, neural_rec_time, bin_size)

Returns edges of bins for a given patient with the right bin size, while excluding bins where the movie was paused.

Parameters:

Name Type Description Default
patient_id int

ID of patient

required
session_nr int

session number

required
neural_rec_time ndarray

vector of neural recording time of patient

required
bin_size int

size of bin in milliseconds

required

Returns:

Type Description
ndarray

np.ndarray: Edges of bins, excluding paused intervals.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def get_bins_excl_pauses(
    patient_id: int, session_nr: int, neural_rec_time: np.ndarray, bin_size: int
) -> np.ndarray:
    """
    Returns edges of bins for a given patient with the right bin size, while excluding bins where the movie was paused.

    Args:
        patient_id (int): ID of patient
        session_nr (int): session number
        neural_rec_time (np.ndarray): vector of neural recording time of patient
        bin_size (int): size of bin in milliseconds

    Returns:
        np.ndarray: Edges of bins, excluding paused intervals.
    """
    start_times_pauses, stop_times_pauses = get_start_stop_times_pauses(patient_id, session_nr)
    rec_on = neural_rec_time[0]
    rec_off = neural_rec_time[-1]
    total_msec = rec_off - rec_on
    total_bins = int(total_msec / bin_size)
    bins = np.linspace(rec_on, rec_off, total_bins)
    bins_no_pauses = pause_handling.rm_pauses_bins(bins, start_times_pauses, stop_times_pauses)

    return bins_no_pauses

create_vector_from_start_stop_times_reference

create_vector_from_start_stop_times_reference(reference_vector, values, starts, stops)

Create an indicator function from values, start and stop times of a label aligned to a reference vector of time points. Used to create an indicator function (vector indicating if a labelled feature was present during the interval between two time points) from a set of bin edges.

Parameters:

Name Type Description Default
reference_vector ndarray

vector of linearly spaced time points (e.g. bin edges)

required
values ndarray

values indicating presence or absence of a labeled feature

required
starts ndarray

start times of the corresponding values

required
stops ndarray

stop times of the corresponding values

required

Returns:

Type Description
ndarray

np.ndarray: indicator function vector.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_vector_from_start_stop_times_reference(
    reference_vector: np.ndarray,
    values: np.ndarray,
    starts: np.ndarray,
    stops: np.ndarray,
) -> np.ndarray:
    """
    Create an indicator function from values, start and stop times of a label aligned to a reference vector of time points. 
    Used to create an indicator function (vector indicating if a labelled feature was present during the interval between two time points) from a set of bin edges. 

    Args:
        reference_vector (np.ndarray): vector of linearly spaced time points (e.g. bin edges)
        values (np.ndarray): values indicating presence or absence of a labeled feature
        starts (np.ndarray): start times of the corresponding values 
        stops (np.ndarray): stop times of the corresponding values

    Returns:
        np.ndarray: indicator function vector.
    """
    # check if input has the correct format
    if not (len(values) == len(starts) == len(stops)):
        print("vectors values, starts and stops have to be the same length")
        return -1

    ret = []

    for i in range(0, len(reference_vector) - 1):
        value = get_value_in_time_frame(time_point1=reference_vector[i], time_point2=reference_vector[i + 1],
                                        values=values, start_times=starts, end_times=stops)
        ret.append(value)

    return ret

get_value_matching_start_point

get_value_matching_start_point(time_point, values, start_times, end_times)

Finds the value in a vector that corresponds to the closest start time less than or equal to a given time point.

Parameters:

Name Type Description Default
time_point float

the time point for which the value shall be searched

required
values ndarray

vector with all values

required
start_times ndarray

vector with all start times

required
end_times ndarray

vector with all stop times

required

Returns:

Name Type Description
float float

Value corresponding to the time point.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def get_value_matching_start_point(
    time_point: float, values: np.ndarray, start_times: np.ndarray, end_times: np.ndarray
) -> float:
    """
    Finds the value in a vector that corresponds to the closest start time less than or equal to a given time point.

    Args:
        time_point (float): the time point for which the value shall be searched
        values (np.ndarray): vector with all values
        start_times (np.ndarray): vector with all start times
        end_times (np.ndarray): vector with all stop times

    Returns:
        float: Value corresponding to the time point.

    """
    index = get_index_nearest_timestamp_in_vector(start_times, time_point)
    if time_point < start_times[index]:
        if time_point <= start_times[0]:
            return index
        index -= 1
    return values[index]

get_value_matching_stop_point

get_value_matching_stop_point(time_point, values, start_times, end_times)

Finds the value in a vector that corresponds to the closest stop time less than or equal to a given time point.

Parameters:

Name Type Description Default
time_point float

the time point for which the value shall be searched

required
values ndarray

vector with all values

required
start_times ndarray

vector with all start times

required
end_times ndarray

vector with all stop times

required

Returns:

Name Type Description
float float

Value corresponding to the time point.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def get_value_matching_stop_point(
    time_point: float, values: np.ndarray, start_times: np.ndarray, end_times: np.ndarray
) -> float:
    """
    Finds the value in a vector that corresponds to the closest stop time less than or equal to a given time point.

    Args:
        time_point (float): the time point for which the value shall be searched
        values (np.ndarray): vector with all values
        start_times (np.ndarray): vector with all start times
        end_times (np.ndarray): vector with all stop times

    Returns:
        float: Value corresponding to the time point.
    """

    index = get_index_nearest_timestamp_in_vector(end_times, time_point)
    if time_point >= end_times[index]:
        if time_point >= end_times[-1]:
            return index
        index += 1
    return values[index]

get_index_matching_start_point

get_index_matching_start_point(time_point, values, start_times, end_times)

Finds the index of the start point that is the closest start point smaller than 'time_point'.

Parameters:

Name Type Description Default
time_point float

the time point for which the value shall be searched

required
values ndarray

vector with all values

required
start_times ndarray

vector with all start times

required
end_times ndarray

vector with all stop times

required

Returns:

Name Type Description
float int

Value corresponding to the time point.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def get_index_matching_start_point(
    time_point: float, values: np.ndarray, start_times: np.ndarray, end_times: np.ndarray
) -> int:
    """
    Finds the index of the start point that is the closest start point smaller than 'time_point'.

    Args:
        time_point (float): the time point for which the value shall be searched
        values (np.ndarray): vector with all values
        start_times (np.ndarray): vector with all start times
        end_times (np.ndarray): vector with all stop times

    Returns:
        float: Value corresponding to the time point.
    """
    index = get_index_nearest_timestamp_in_vector(start_times, time_point)
    if time_point < start_times[index]:
        if time_point < start_times[0]:
            return index
        index -= 1
    return index

get_index_matching_stop_point

get_index_matching_stop_point(time_point, values, start_times, end_times)

Finds the index of the stop point that is the closest stop point greater than 'time_point'.

Parameters:

Name Type Description Default
time_point float

the time point for which the value shall be searched

required
values ndarray

vector with all values

required
start_times ndarray

vector with all start times

required
end_times ndarray

vector with all stop times

required

Returns:

Name Type Description
float int

Value corresponding to the time point.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def get_index_matching_stop_point(
    time_point: float, values: np.ndarray, start_times: np.ndarray, end_times: np.ndarray
) -> int:
    """
    Finds the index of the stop point that is the closest stop point greater than 'time_point'.

    Args:
        time_point (float): the time point for which the value shall be searched
        values (np.ndarray): vector with all values
        start_times (np.ndarray): vector with all start times
        end_times (np.ndarray): vector with all stop times

    Returns:
        float: Value corresponding to the time point.
    """
    index = get_index_nearest_timestamp_in_vector(end_times, time_point)
    if time_point >= end_times[index]:
        if time_point >= end_times[-1]:
            return index
        index += 1
    return index

get_value_in_time_frame

get_value_in_time_frame(time_point1, time_point2, values, start_times, end_times)

Finds the value that is most represented between two time points. Needed for creating an indicator function from a set of bin edges with a bin size longer than the frame length, as a bin could contain multiple segments with different values.

Parameters:

Name Type Description Default
time_point1 float

lower bound of time frame

required
time_point2 float

upper bound of time frame that is regarded

required
values ndarray

vector with all values

required
start_times ndarray

vector with all start time points

required
end_times ndarray

vector with all stop time points

required

Returns:

Name Type Description
float float

value most represented within the time frame.

Source code in epiphyte/preprocessing/data_preprocessing/create_vectors_from_time_points.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def get_value_in_time_frame(
    time_point1: float,
    time_point2: float,
    values: np.ndarray,
    start_times: np.ndarray,
    end_times: np.ndarray,
) -> float:
    """
    Finds the value that is most represented between two time points.
    Needed for creating an indicator function from a set of bin edges with a bin size longer than the frame length, as a bin could contain multiple segments with different values.

    Args:
        time_point1 (float): lower bound of time frame         
        time_point2 (float): upper bound of time frame that is regarded
        values (np.ndarray): vector with all values
        start_times (np.ndarray): vector with all start time points
        end_times (np.ndarray): vector with all stop time points

    Returns:
        float: value most represented within the time frame.
    """
    index_1 = get_index_matching_start_point(time_point1, values, start_times, end_times)
    index_2 = get_index_matching_stop_point(time_point2, values, start_times, end_times)
    if index_1 == index_2:
        return values[index_1]
    else:
        df = pd.DataFrame(columns=["value", "weighing"])
        # first interval: add weighing of end_point of this segment - timepoint1
        df = df.append({"value": values[index_1], "weighing": end_times[index_1] - time_point1}, ignore_index=True)
        # all in between intervals: add weighing of length of segment
        for i in range(1, index_2 - index_1):
            if values[index_1 + i] in df.values:
                df.loc[df["value"] == values[index_1 + i], "weighing"] += end_times[index_1 + i] - start_times[
                    index_1 + i]
            df = df.append(
                {"value": values[index_1 + i], "weighing": end_times[index_1 + i] - start_times[index_1 + i]},
                ignore_index=True)
        # last interval: add weighing of timepoint2 - start_point of this segment
        df = df.append({"value": values[index_2], "weighing": time_point2 - start_times[index_2]}, ignore_index=True)

    return list(df[df['weighing'] == df['weighing'].max()]["value"])[0]

data_utils

I/O and time-alignment utilities for Neuralynx-like event logs.

Provides helpers to read .nev (and mock .npy) event files, parse watchlogs/DAQ logs, and linearly align between local computer time and neural recording system time.

TimeConversion

Bases: object

Linear mapping between CPU time and neural recording time.

Enables conversion of stimulus timestamps (e.g., movie frames) to the spike time scale.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class TimeConversion(object):
    """Linear mapping between CPU time and neural recording time.

    Enables conversion of stimulus timestamps (e.g., movie frames) to the spike
    time scale.
    """

    def __init__(self, path_to_wl: str | Path, path_to_dl: str | Path,
                 path_to_events: str | Path) -> None:
        self.path_watchlog = path_to_wl
        self.path_daqlog = path_to_dl
        self.path_evts = path_to_events

    def convert(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute mapping and convert watchlog times to DAQ times.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(pts_seconds, dts_ms, cpu_time_us)`` arrays.
        """

        event_mat = process_events(nev_read(self.path_evts))
        m, b = get_coeff(event_mat, self.path_daqlog)
        pts, cpu_time = read_watchlog(self.path_watchlog)

        # first convert cpu time to recording system time
        daq_time = cpu_time * m + b

        return pts, daq_time, cpu_time

    def convert_pauses(self) -> Tuple[List[float], List[float]]:
        """Convert pause CPU timestamps to neural recording time.

        Returns:
            Tuple[List[float], List[float]]: ``(starts_ms, stops_ms)`` lists in neural recording time.
        """
        start, stop = read_watchlog_pauses(self.path_watchlog)
        event_mat = process_events(nev_read(self.path_evts))
        m, b = get_coeff(event_mat, self.path_daqlog)

        convert_start = [time * m + b for i, time in enumerate(start)]
        convert_stop = [time * m + b for i, time in enumerate(stop)]

        #### NOTE: depending on the output set-up, comment/uncomment. 
        ##### Generally, can use the highlights options on the interactive plot
        ##### to indicate if time-scales are not coherent between the data and the 
        ##### pauses. 
        #convert_start = make_msec(convert_start)
        #convert_stop = make_msec(convert_stop)

        return convert_start, convert_stop


    def convert_skips(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Detect skips and return start/stop/value segments in DAQ time.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(start_values_ms, stop_values_ms, values_idx)`` arrays.
        """
        pts, daq_time, cpu_time = self.convert()

        threshold = 1
        max_jump = np.max(np.abs(np.diff(pts)))


        if max_jump >= threshold:
            print("There is a skip in the movie frame playback that is bigger than {} frames.\nThe biggest skip is {} frames.".format((threshold / 0.04), (max_jump / 0.04)))

            # list of indices where the pts jumped by 25+ frames
            beyond_threshold = np.where(np.abs(np.diff(pts)) > threshold)[0]
            print("Timepoints of skips, in neural_rec_time: {}".format(daq_time[beyond_threshold]))

            ## setting up start/stop values
            timepoints_of_skips = []
            timepoints_of_skips.append(daq_time[0]) # set first start point to the start of the rec_log

            for index in beyond_threshold:
                timepoints_of_skips.append(daq_time[index])
                timepoints_of_skips.append(daq_time[index + 1])

            timepoints_of_skips.append(daq_time[-1])

            ## specifying starts and stops from timepoint collection 
            start_values = timepoints_of_skips[0:-1:2]
            stop_values = timepoints_of_skips[1::2]
            values = np.array(range(0, len(start_values)))
            print("Start timepoints: {}".format(start_values))
            print("Stop timepoints: {}".format(stop_values))
            print("")

        else:
            print("There's not any skips in the movie frame playback that are bigger than {} frames.\nThe biggest skip is {} frames.".format((threshold / 0.04), (max_jump / 0.04)))
            print(" ")
            start_values = daq_time[0]
            stop_values = daq_time[-1]
            values = np.array([0])


        return start_values, stop_values, values
convert
convert()

Compute mapping and convert watchlog times to DAQ times.

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray]

Tuple[np.ndarray, np.ndarray, np.ndarray]: (pts_seconds, dts_ms, cpu_time_us) arrays.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def convert(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Compute mapping and convert watchlog times to DAQ times.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(pts_seconds, dts_ms, cpu_time_us)`` arrays.
    """

    event_mat = process_events(nev_read(self.path_evts))
    m, b = get_coeff(event_mat, self.path_daqlog)
    pts, cpu_time = read_watchlog(self.path_watchlog)

    # first convert cpu time to recording system time
    daq_time = cpu_time * m + b

    return pts, daq_time, cpu_time
convert_pauses
convert_pauses()

Convert pause CPU timestamps to neural recording time.

Returns:

Type Description
Tuple[List[float], List[float]]

Tuple[List[float], List[float]]: (starts_ms, stops_ms) lists in neural recording time.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def convert_pauses(self) -> Tuple[List[float], List[float]]:
    """Convert pause CPU timestamps to neural recording time.

    Returns:
        Tuple[List[float], List[float]]: ``(starts_ms, stops_ms)`` lists in neural recording time.
    """
    start, stop = read_watchlog_pauses(self.path_watchlog)
    event_mat = process_events(nev_read(self.path_evts))
    m, b = get_coeff(event_mat, self.path_daqlog)

    convert_start = [time * m + b for i, time in enumerate(start)]
    convert_stop = [time * m + b for i, time in enumerate(stop)]

    #### NOTE: depending on the output set-up, comment/uncomment. 
    ##### Generally, can use the highlights options on the interactive plot
    ##### to indicate if time-scales are not coherent between the data and the 
    ##### pauses. 
    #convert_start = make_msec(convert_start)
    #convert_stop = make_msec(convert_stop)

    return convert_start, convert_stop
convert_skips
convert_skips()

Detect skips and return start/stop/value segments in DAQ time.

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray]

Tuple[np.ndarray, np.ndarray, np.ndarray]: (start_values_ms, stop_values_ms, values_idx) arrays.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def convert_skips(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Detect skips and return start/stop/value segments in DAQ time.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(start_values_ms, stop_values_ms, values_idx)`` arrays.
    """
    pts, daq_time, cpu_time = self.convert()

    threshold = 1
    max_jump = np.max(np.abs(np.diff(pts)))


    if max_jump >= threshold:
        print("There is a skip in the movie frame playback that is bigger than {} frames.\nThe biggest skip is {} frames.".format((threshold / 0.04), (max_jump / 0.04)))

        # list of indices where the pts jumped by 25+ frames
        beyond_threshold = np.where(np.abs(np.diff(pts)) > threshold)[0]
        print("Timepoints of skips, in neural_rec_time: {}".format(daq_time[beyond_threshold]))

        ## setting up start/stop values
        timepoints_of_skips = []
        timepoints_of_skips.append(daq_time[0]) # set first start point to the start of the rec_log

        for index in beyond_threshold:
            timepoints_of_skips.append(daq_time[index])
            timepoints_of_skips.append(daq_time[index + 1])

        timepoints_of_skips.append(daq_time[-1])

        ## specifying starts and stops from timepoint collection 
        start_values = timepoints_of_skips[0:-1:2]
        stop_values = timepoints_of_skips[1::2]
        values = np.array(range(0, len(start_values)))
        print("Start timepoints: {}".format(start_values))
        print("Stop timepoints: {}".format(stop_values))
        print("")

    else:
        print("There's not any skips in the movie frame playback that are bigger than {} frames.\nThe biggest skip is {} frames.".format((threshold / 0.04), (max_jump / 0.04)))
        print(" ")
        start_values = daq_time[0]
        stop_values = daq_time[-1]
        values = np.array([0])


    return start_values, stop_values, values

nev_read

nev_read(filename)

Read event timestamps and codes from .nev or mock .npy file.

Parameters:

Name Type Description Default
filename str | Path

Path to .nev or mock .npy array.

required

Returns:

Type Description
ndarray

np.ndarray: (timestamp, nttl) array of shape (N, 2).

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def nev_read(filename: str | Path) -> np.ndarray:
    """Read event timestamps and codes from ``.nev`` or mock ``.npy`` file.

    Args:
        filename (str | Path): Path to ``.nev`` or mock ``.npy`` array.

    Returns:
        np.ndarray: ``(timestamp, nttl)`` array of shape ``(N, 2)``.
    """
    filename = Path(filename)

    if filename.suffix.lower() == ".nev":
        eventmap = np.memmap(filename, dtype=nev_type, mode='r', offset=NLX_OFFSET)
        ret = np.array([eventmap['timestamp'], eventmap['nttl']]).T
    elif filename.suffix.lower() == ".npy":
        ret = np.load(filename)
    return ret

nev_string_read

nev_string_read(filename)

Read event timestamps and strings from a .nev file.

Parameters:

Name Type Description Default
filename str | Path

Path to .nev file.

required

Returns:

Type Description
ndarray

np.ndarray: (timestamp, ev_string) array of shape (N, 2).

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
41
42
43
44
45
46
47
48
49
50
51
def nev_string_read(filename: str | Path) -> np.ndarray:
    """Read event timestamps and strings from a ``.nev`` file.

    Args:
        filename (str | Path): Path to ``.nev`` file.

    Returns:
        np.ndarray: ``(timestamp, ev_string)`` array of shape ``(N, 2)``.
    """
    eventmap = np.memmap(filename, dtype=nev_type, mode='r', offset=NLX_OFFSET)
    return np.array([eventmap['timestamp'], eventmap['ev_string']]).T

process_movie_events

process_movie_events(ev_array)

Filter raw event rows to the movie-event sequence.

Parameters:

Name Type Description Default
ev_array ndarray

(timestamp, code) array.

required

Returns: np.ndarray: Filtered movie event rows.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def process_movie_events(ev_array: np.ndarray) -> np.ndarray:
    """Filter raw event rows to the movie-event sequence.

    Args:
        ev_array (np.ndarray): ``(timestamp, code)`` array.
    Returns:
        np.ndarray: Filtered movie event rows.
    """

    wait_for = [1]
    last = 0
    keep = []

    for row in ev_array:
        code = row[1].astype(int)

        if code not in wait_for:
            continue

        elif code in [1, 2, 4, 8, 16, 32, 64, 128]:
            wait_for = [0]
            keep.append(row)

        elif code == 0:

            if last == 128:
                wait_for = [1]
            elif last in [1, 2, 4, 8, 16, 32, 64]:
                wait_for = [last * 2]

        last = code

    return np.array(keep)

process_events

process_events(ev_array)

Extract movie-event rows from a full event array.

Parameters:

Name Type Description Default
ev_array ndarray

Full event array.

required

Returns: np.ndarray: Movie-event subset.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def process_events(ev_array: np.ndarray) -> np.ndarray:
    """Extract movie-event rows from a full event array.

    Args:
        ev_array (np.ndarray): Full event array.
    Returns:
        np.ndarray: Movie-event subset.
    """

    if float(101) in ev_array[:, 1]:
        onsets = (ev_array[:, 1] == 101).nonzero()[0]
        n_101 = onsets.shape[0]

        # use the first 8 as marker that screening is over
        first_8 = (ev_array[:, 1] == 8).nonzero()[0][0]

        # go back to last 4 before first 8
        last_4 = (ev_array[:first_8, 1] == 4).nonzero()[0][-2]
        # print(first_8, last_4)

        assert n_101 in (4, 8)

        movie_events = ev_array[last_4 + 1:onsets[4], :]
        ret = process_movie_events(movie_events)

    elif not float(0) in ev_array[:,1]:
        ret = ev_array

    return ret

getlines

getlines(filename)

Read a text file and return the raw lines as bytes.

Parameters:

Name Type Description Default
filename str | Path

Path to file.

required

Returns:

Type Description
list[bytes]

list[bytes]: List of lines (bytes).

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def getlines(filename: str | Path) -> list[bytes]:
    """Read a text file and return the raw lines as bytes.

    Args:
        filename (str | Path): Path to file.

    Returns:
        list[bytes]: List of lines (bytes).
    """

    with open(filename, 'rb') as logfile:
        data = logfile.read()
    lines = data.splitlines()
    return lines

read_watchlog

read_watchlog(watchlogfile)

Extract PTS (s) and CPU times (µs) from a watchlog.

Parameters:

Name Type Description Default
watchlogfile str | Path

Path to watchlog created by ffmpeg wrapper.

required

Returns:

Type Description
Tuple[ndarray, ndarray]

Tuple[np.ndarray, np.ndarray]: Tuple (pts_seconds, cpu_time_us).

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def read_watchlog(watchlogfile: str | Path) -> Tuple[np.ndarray, np.ndarray]:
    """Extract PTS (s) and CPU times (µs) from a watchlog.

    Args:
        watchlogfile (str | Path): Path to watchlog created by ffmpeg wrapper.

    Returns:
        Tuple[np.ndarray, np.ndarray]: Tuple ``(pts_seconds, cpu_time_us)``.
    """
    lines = getlines(watchlogfile)
    pts = []
    time = []

    for line in lines[1:]:
        fields  = line.split()
        if len(fields) == 4:
            pts.append(float(fields[1]))
            time.append(int(fields[3]))

    pts = np.array(pts)
    time = np.array(time)
    return pts, time

read_watchlog_pauses

read_watchlog_pauses(watchlogfile)

Find pause segments in the watchlog.

Parameters:

Name Type Description Default
watchlogfile str | Path

Path to pts/CPU watchlog.

required

Returns: Tuple[List[int], List[int]]: (start_times_us, stop_times_us) lists.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def read_watchlog_pauses(watchlogfile: str | Path) -> Tuple[List[int], List[int]]:
    """Find pause segments in the watchlog.

    Args:
        watchlogfile (str | Path): Path to pts/CPU watchlog.
    Returns:
        Tuple[List[int], List[int]]: ``(start_times_us, stop_times_us)`` lists.
    """
    lines = getlines(watchlogfile)
    start_time = []
    stop_time = []

    for i, line in enumerate(lines):
        fields = line.split()
        first = str(fields[0])
        # print(first)
        if "Pausing" in first:
            # print(line)
            start = i - 1
            start_line = lines[start]
            # print(start_line)
            start_fields = start_line.split()
            start_time.append(int(start_fields[3]))

        if "Continuing" in first:
            # print(line)
            stop = i + 1
            stop_line = lines[stop]
            stop_fields = stop_line.split()
            stop_time.append(int(stop_fields[3]))

        if "Properly" in first:
            start = i - 3
            start_line = lines[start]
            # print(start_line)
            start_fields = start_line.split()
            stop_time.append(int(start_fields[3]))

    return start_time, stop_time

read_daqlog

read_daqlog(daqlogfile)

Extract DAQ values and pre/post times.

Parameters:

Name Type Description Default
daqlogfile str | Path

Path to DAQ log file.

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray]

Tuple[np.ndarray, np.ndarray, np.ndarray]: (values, pretime_us, posttime_us) arrays.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def read_daqlog(daqlogfile: str | Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Extract DAQ values and pre/post times.

    Args:
        daqlogfile (str | Path): Path to DAQ log file.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(values, pretime_us, posttime_us)`` arrays.
    """
    lines = getlines(daqlogfile)
    values = []
    pretime = []
    posttime = []

    for line in lines[3:]:
        fields = line.split()
        if len(fields) == 4:
            values.append(int(fields[0]))
            pretime.append(int(fields[2]))
            posttime.append(int(fields[3]))

    values = np.array(values)
    pretime = np.array(pretime)
    posttime = np.array(posttime)

    return values, pretime, posttime

get_coeff

get_coeff(event_mat, daqlogfile)

Fit a linear mapping from DAQ post times to event timestamps.

Loads logs, validates events, and returns slope/intercept.

Parameters:

Name Type Description Default
event_mat ndarray

(timestamp, code) event array.

required
daqlogfile str | Path

Path to DAQ log file.

required

Returns: np.ndarray: [m, b] array such that timestamp = m*post + b.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def get_coeff(event_mat: np.ndarray, daqlogfile: str | Path) -> np.ndarray:
    """Fit a linear mapping from DAQ post times to event timestamps.

    Loads logs, validates events, and returns slope/intercept.

    Args:
        event_mat (np.ndarray): ``(timestamp, code)`` event array.
        daqlogfile (str | Path): Path to DAQ log file.
    Returns:
        np.ndarray: ``[m, b]`` array such that ``timestamp = m*post + b``.
    """
    eventTimes, eventValues = event_mat[:,0],event_mat[:,1]
    daqValues, daqPretimes, daqPosttimes = read_daqlog(daqlogfile)

    # check events:
    EventErrors = (eventValues != daqValues).sum()
    if EventErrors:
        raise(Warning('Events from 2 logs do not match, {} errors.'.
                      format(EventErrors)))

    # check that daq is quick enough
    diffs = daqPosttimes - daqPretimes
    print("Min Daq Diff: {:.1f} ms, Max Daq Diff: {:.1f} ms".
          format(diffs.min()/1e3, diffs.max()/1e3))

    # convert daqPosttimes to eventTimes by polyfit, check error
    m, b = np.polyfit(daqPosttimes, eventTimes, 1)
    fitdaq = m*daqPosttimes + b
    maxFitError = np.abs(fitdaq-eventTimes).max()/1e3

    print("Maximum Error after Event fit: {:.1f} ms".format(maxFitError))

    return np.array([m, b])

make_msec

make_msec(list_usec)

Convert a list from microseconds to milliseconds.

Parameters:

Name Type Description Default
list_usec list[int]

Times in microseconds.

required

Returns: list[float]: Times in milliseconds.

Source code in epiphyte/preprocessing/data_preprocessing/data_utils.py
264
265
266
267
268
269
270
271
272
273
274
def make_msec(list_usec: list[int]) -> list[float]:
    """Convert a list from microseconds to milliseconds.

    Args:
        list_usec (list[int]): Times in microseconds.
    Returns:
        list[float]: Times in milliseconds.
    """

    list_msec = [time / 1000 for i, time in enumerate(list_usec)]
    return list_msec