Skip to content

epiphyte.database

access_info

Database connection and storage configuration for Epiphyte.

This module sets up the DataJoint connection parameters and storage settings. The module is imported by the db_setup.py script to connect to the local or remote database.

config

Hard-coded variables for initializing the database.

This modules defines the paths and variables used to initialize the mock database.

Paths:

  • PATH_TO_REPO: Path to the root of the repository.
  • PATH_TO_DATA: Path to the folder containing the mock data.
  • PATH_TO_LABELS: Path to the folder containing the movie annotations.
  • PATH_PATIENT_ALIGNED_LABELS: Path to the folder containing the patient-aligned annotations.
  • PATH_TO_PATIENT_DATA: Path to the folder containing the refractored mock patient data.
  • PATH_TO_SESSION_DATA: Path to the folder containing the refractored mock session data.

Variables:

  • PTS_MOVIE_new: List of time points for the movie, sampled at 25 Hz (0.04s intervals).
  • patients: List of dictionaries, each containing information about a patient (id, age, gender, year).
  • sessions: List of dictionaries, each containing information about a session (patient_id, session_nr, session_type).
  • annotators: List of dictionaries, each containing information about an annotator (id, first_name, last_name).
  • label_names: List of label names used in the annotations.

db_setup

DataJoint tables and population helpers for the mock database.

This module defines DataJoint schemas and tables used to represent patients, sessions, events, annotations, spikes, and derived entities used throughout the tutorials.

Methods for populating the tables are included as class methods. Tables are populating using the mock data generated in epiphyte.data.mock_data_utils.

Conventions
  • Tables are defined according to the order of population, with the most top-level tables first, followed by tables which pull keys from those tables.
  • The method of populating a table varies by table type and content. For Imported tables, the population function is class method. For Manual tables, the population method is defined separately.

Patients

Bases: Lookup

Table containing patient demographics.

Source code in epiphyte/database/db_setup.py
33
34
35
36
37
38
39
40
41
42
43
44
45
@epi_schema
class Patients(dj.Lookup):
    """Table containing patient demographics."""
    definition = """
    # general patient data, imported from config file
    patient_id: int                                    # patient ID
    ---
    age: smallint                                      # age of patient
    gender: enum('f', 'm', 'x')                        # gender of patient
    year: int                                          # year of surgery
    """

    contents = config.patients

Sessions

Bases: Lookup

Table containing recording session metadata per patient.

Source code in epiphyte/database/db_setup.py
47
48
49
50
51
52
53
54
55
56
57
58
@epi_schema
class Sessions(dj.Lookup):
    """Table containing recording session metadata per patient."""
    definition = """
    # general session data, imported from config file
    patient_id: int                                    # patient ID
    session_nr: int                                    # session number
    ---
    session_type: enum('full_movie', 'follow_up', 'partial_movie')   # type of session for corresponding recording
    """

    contents = config.sessions

Annotator

Bases: Lookup

Table containing annotators who labeled movie content and events.

Source code in epiphyte/database/db_setup.py
60
61
62
63
64
65
66
67
68
69
70
71
@epi_schema
class Annotator(dj.Lookup):
    """Table containing annotators who labeled movie content and events."""
    definition = """
    # annatotors of the video, imported from config file
    annotator_id: varchar(5)                    # unique ID for each annotator
    ---
    first_name: varchar(32)                      # first name of annotator
    last_name: varchar(32)                       # last name of annotator
    """

    contents = config.annotators

LabelName

Bases: Lookup

Table containing the name of labelled content.

Source code in epiphyte/database/db_setup.py
73
74
75
76
77
78
79
80
81
@epi_schema
class LabelName(dj.Lookup):
    """Table containing the name of labelled content."""
    definition = """
    # names of existing labels, imported from config file
    label_name: varchar(32)   # label name
    """

    contents = config.label_names

MovieSession

Bases: Imported

Table containing the session-wise movie timing and channel metadata.

Populates from watchlogs, DAQ logs, and event files under the session directory. Stores PTS, DTS, neural recording time, and channel names.

Source code in epiphyte/database/db_setup.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@epi_schema
class MovieSession(dj.Imported):
    """Table containing the session-wise movie timing and channel metadata.

    Populates from watchlogs, DAQ logs, and event files under the session
    directory. Stores PTS, DTS, neural recording time, and channel names.
    """
    definition = """
    # data of individual movie watching sessions
    -> Patients                          # patient ID
    -> Sessions                          # session ID
    ---
    date : date                         # date of movie session
    time : time
    pts: longblob                       # order of movie frames for patient (pts) 
    dts: longblob                       # cpu time stamps (dts)
    neural_recording_time: longblob     # neural recording time (rectime)
    channel_names: longblob             # channel name, indicating electrode number and brain region
    """

    def _make_tuples(self, key):
        """Populate the MovieSession table from the session files of each patient."""
        patient_ids = Patients.fetch("patient_id")       

        for _, pat in enumerate(patient_ids):

            pat_sessions = (Sessions & f"patient_id={pat}").fetch("session_nr")       

            try:
                checks = (MovieSession & f"patient_id={pat}").fetch("session_nr")
                if len(checks) == len(pat_sessions):
                    print(checks)
                    print(len(checks), len(pat_sessions))
                    continue
                else:
                    print(f"Adding patient {pat} to database...")
                    pass
            except:
                print(f"Adding patient {pat} to database...")
                pass

            for _, sesh in enumerate(pat_sessions):

                try:
                    check = len((MovieSession & f"patient_id={1}" & f"session_nr={1}").fetch("pts")[0])
                    if check > 0:
                        print(f"Adding patient {pat} to database...")
                        pass
                    else:
                        continue
                except:
                    print(f"Adding patient {pat} to database...")
                    pass

                main_patient_dir = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}")

                session_info = np.load(main_patient_dir / "session_info.npy", allow_pickle=True)
                date = session_info.item().get("date")
                time = session_info.item().get("time")
                time = datetime.strptime(time, '%H-%M-%S').strftime('%H:%M.%S')

                path_wl =  main_patient_dir / "watchlogs" 
                ffplay_file = next(path_wl.glob("ffplay*"), None)

                if ffplay_file:
                    print(" Found ffplay file:", ffplay_file)
                else:
                    print(" No ffplay file found in the watchlogs directory.")
                    break

                path_daq = main_patient_dir / "daq_files" 
                daq_file = next(path_daq.glob("timedDAQ*"), None)

                if ffplay_file:
                    print(" Found DAQ file:", daq_file)
                else:
                    print(" No DAQ file found in the daq_files directory.")
                    break

                path_events = main_patient_dir / "event_file" / "Events.npy"
                time_conversion = data_utils.TimeConversion(path_to_wl=ffplay_file, path_to_dl=daq_file,
                                                                    path_to_events=path_events)
                pts, rectime, dts = time_conversion.convert()

                save_dir = main_patient_dir / "movie_info"
                save_dir.mkdir(exist_ok=True)
                np.save(save_dir / "pts.npy", pts)
                np.save(save_dir / "dts.npy", dts)
                np.save(save_dir / "neural_rec_time.npy", rectime)

                path_channel_names = main_patient_dir / "ChannelNames.txt"
                channel_names = helpers.get_channel_names(path_channel_names)

                self.insert1({'patient_id': pat,
                            'session_nr': sesh,
                            'date': date,
                            'time': time,
                            'pts': pts,
                            'dts': dts,
                            'neural_recording_time': rectime,
                            'channel_names': channel_names
                            }, skip_duplicates=True)

LFPData

Bases: Manual

Table containing the local field potential-like signals from each channel.

Populated manually using the populate_lfp_data_table() function.

Source code in epiphyte/database/db_setup.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@epi_schema
class LFPData(dj.Manual):
    """Table containing the local field potential-like signals from each channel.

    Populated manually using the `populate_lfp_data_table()` function.
    """
    definition = """
    # local field potential data, by channel. 
    -> Patients
    -> Sessions
    csc_nr: int
    ---
    samples: longblob                # samples, in microvolts
    timestamps: longblob             # timestamps corresponding to each sample, in ms
    sample_rate: int                 # sample rate from the recording device
    brain_region: varchar(8)         # brain region where unit was recorded
    """

ElectrodeUnit

Bases: Imported

Table containing information on the units detected per channel with type and within-channel number.

Source code in epiphyte/database/db_setup.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@epi_schema
class ElectrodeUnit(dj.Imported):
    """Table containing information on the units detected per channel with type and within-channel number."""
    definition = """
    # Contains information about the implanted electrodes of each patient
    -> Patients                      # patient ID
    -> Sessions                      # session number
    unit_id: int                     # unique ID for unit (for respective  patient)
    ---
    csc: int                         # number of CSC file
    unit_type: enum('M', 'S', 'X')   # unit type: 'M' for Multi Unit, 'S' for Single Unit, 'X': undefined
    unit_nr: int                     # number of unit, as there can be several multi units and single units in one CSC file
    brain_region: varchar(8)         # brain region where unit was recorded
    """

    def _make_tuples(self, key):
        """Populate by parsing spike filenames and channel names."""
        patient_ids = Patients.fetch("patient_id")

        # iterate over each patient in db
        for i_pat, pat in enumerate(patient_ids):
            pat_sessions = (Sessions & f"patient_id={pat}").fetch("session_nr")

            # further iterate over each patient's sessions
            for i_sesh, sesh in enumerate(pat_sessions):

                path_channels = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}")
                channel_names = helpers.get_channel_names(path_channels / "ChannelNames.txt")

                try:
                    check = (ElectrodeUnit & f"patient_id={pat}" & f"session_nr={sesh}").fetch("csc_nr")
                    if len(check) == len(channel_names):
                        continue
                    else:
                        print(f"    Adding patient {pat} session {sesh} to database...")
                        pass
                except:
                    print(f"    Adding patient {pat} session {sesh} to database...")
                    pass

                spike_dir = Path(config.PATH_TO_DATA, "patient_data", str(pat), f"session_{sesh}", "spiking_data")
                spike_filepaths = list(spike_dir.iterdir())
                spike_filenames = sorted([s.name for s in spike_filepaths], key=helpers.extract_sort_key)

                for unit_id, filename in enumerate(spike_filenames):
                    csc_nr, unit = filename[:-4].split("_")
                    csc_index = int(csc_nr[3:]) - 1
                    print(f"    ... Unit ID: {unit_id}, CSC #: {csc_nr}, Channel index: {csc_index}")

                    channel = channel_names[csc_index]
                    print(f"    ... Channel name: {channel}")

                    unit_type, unit_nr = helpers.get_unit_type_and_number(unit)
                    print(f"    ... Unit type: {unit_type},  Within-channel unit number: {unit_nr}")

                    self.insert1({'patient_id': pat,
                                'session_nr': sesh,
                                'unit_id': unit_id, 
                                'csc': csc_nr[3:], 
                                'unit_type': unit_type, 
                                'unit_nr': unit_nr,
                                'brain_region': channel},
                                    skip_duplicates=True)

                    print(" ")

MovieAnnotation

Bases: Imported

Table containing the raw movie annotations (values and segments) per label and annotator.

Source code in epiphyte/database/db_setup.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
@epi_schema
class MovieAnnotation(dj.Imported):
    """Table containing the raw movie annotations (values and segments) per label and annotator."""
    definition = """
    # information about video annotations (e.g. labels of characters); 
    # this table contains start and end time points and values of the segments of the annotations;
    # all time points are in Neural Recording Time;
    -> Annotator                    # creator of movie annotation
    -> LabelName                    # name of annotation
    annotation_date: date           # date of annotation
    ---
    values: longblob                # list of values that represent label
    start_times: longblob           # list of start times of label segments in movie play time (PTS)
    stop_times: longblob            # list of stop times of label segments in movie play time (PTS)
    category: varchar(32)           # category of label; e.g. 'character', 'emotion', 'location'
    indicator_function: longblob    # full indicator function, one value for each movie frame
    """

    def _make_tuples(self, key):
        """Populate by reading ``.npy`` annotation files from labels directory."""
        path_labels = Path(config.PATH_TO_LABELS)

        for filepath in path_labels.iterdir():
            label_id, label_name, annotator, date, category = filepath.name[:-4].split("_")

            try:
                check = (MovieAnnotation & f"label_name='{label_name}'" & f"category='{category}'").fetch("values")
                if len(check) > 0:
                    continue
                else: 
                    print(f"    Adding {label_name}, category {category} to database...")
                    pass
            except:
                print(f"    Adding {label_name}, category {category} to database...")
                pass

            content = np.load(filepath)

            values = np.array(content[0])
            start_times = np.array(content[1])
            stop_times = np.array(content[2])

            ind_func = processing_labels.make_label_from_start_stop_times(values, start_times, stop_times, config.PTS_MOVIE_new)

            print(f"    ... # of occurrences: {int(sum(values))}\n")

            self.insert1({'label_name': label_name,
                            'annotator_id': annotator,
                            'annotation_date': datetime.strptime(date, '%Y%m%d'),
                            'category': category,
                            'values': values,
                            'start_times': start_times,
                            'stop_times': stop_times,
                            'indicator_function': np.array(ind_func)
                            }, skip_duplicates=True)

SpikeData

Bases: Imported

Table containing the spike times and amplitudes per unit in neural recording time.

Source code in epiphyte/database/db_setup.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
@epi_schema
class SpikeData(dj.Imported):
    """Table containing the spike times and amplitudes per unit in neural recording time."""
    definition = """
    # This table contains all spike times of all units of all patients in Neural Recording Time
    # Each entry contains a vector of all spike times of one unit of one patient
    -> Sessions
    -> ElectrodeUnit                   # unit from which data was recorded
    ---
    spike_times: longblob              # in case bin_size is not 0: number of spikes; otherwise: times of spikes (original data)
    spike_amps: longblob               # amplitudes for each spike in spike_times
    """

    def _make_tuples(self, key):
        """Populate by loading unit spike files and matching to units."""
        patient_ids = Patients.fetch("patient_id")

        for i_pat, pat in enumerate(patient_ids):
            pat_sessions = (Sessions & f"patient_id={pat}").fetch("session_nr")

            for i_sesh, sesh in enumerate(pat_sessions):
                spike_dir = Path(config.PATH_TO_DATA, "patient_data", str(pat), f"session_{sesh}", "spiking_data")
                spike_files = list(spike_dir.iterdir())
                unit_ids = (ElectrodeUnit & f"patient_id={pat}" & f"session_nr={sesh}").fetch("unit_id")

                assert len(spike_files) == len(unit_ids), "Number of units in ElectrodeUnits doesn't match number of spiking files."

                try:
                    check = (SpikeData & f"patient_id={pat}" & f"session_nr={sesh}").fetch("unit_id")[0]
                    if len(check) == len(spike_files) == len(unit_ids):
                        continue
                    else:
                        print(f"    Adding patient {pat} session {sesh} to database...")
                        pass
                except:
                    print(f"    Adding patient {pat} session {sesh} to database...")
                    pass

                for filepath in spike_files:
                    filename = filepath.name
                    csc_nr, unit = filename[:-4].split("_")
                    csc_nr = int(csc_nr[3:])
                    unit_type, unit_nr = helpers.get_unit_type_and_number(unit)

                    unit_id = (ElectrodeUnit & f"patient_id={pat}" & f"session_nr={sesh}" 
                            & f"csc={csc_nr}" & f"unit_nr={unit_nr}" & f"unit_type='{unit_type}'").fetch("unit_id")[0]

                    spikes_file = np.load(filepath, allow_pickle=True)
                    spikes_file = spikes_file.item()
                    times = spikes_file["spike_times"]
                    amps = spikes_file["spike_amps"]

                    print(f"    ... Unit ID: {unit_id}, CSC #: {csc_nr}")
                    print(f"    ... Nm. of spikes: {len(times)}")
                    print(f"    ... Max amp.: {np.max(amps)} microV\n")

                    self.insert1({'patient_id': pat, 
                                'session_nr': sesh, 
                                'unit_id': unit_id,
                                'spike_times': times, 
                                'spike_amps': amps}, skip_duplicates=True)

PatientAlignedMovieAnnotation

Bases: Computed

Table containing annotations aligned to individual patient PTS and neural time.

Source code in epiphyte/database/db_setup.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
@epi_schema
class PatientAlignedMovieAnnotation(dj.Computed):
    """Table containing annotations aligned to individual patient PTS and neural time."""
    definition = """
    # Movie Annotations aligned to patient time / time points are in neural recording time
    -> MovieSession        # movie watching session ID
    -> MovieAnnotation     # label
    ---
    label_in_patient_time: longblob    # label matched to patient time (pts)
    values: longblob       # list of values that represent label
    start_times: longblob  # list of start times of label segments in neural recording time
    stop_times: longblob   # list of stop times of label segments in neural recording time
    """

    def make(self, key):
        """Align indicator to PTS and derive start/stop in neural time."""
        patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")
        entries = (MovieAnnotation).fetch('KEY')

        for i_pat, pat in enumerate(patient_ids):
            pat_sessions = session_nrs[i_pat]
            for i_sesh, sesh in enumerate([pat_sessions]):

                print(f"Patient {pat} session {sesh}..")

                for entry in entries:

                    annotator_id = entry["annotator_id"]
                    label_name = entry["label_name"]
                    annotation_date = entry["annotation_date"]

                    try:
                        check = (PatientAlignedMovieAnnotation & f"patient_id={pat}" & f"session_nr={sesh}"
                                    & f"label_name='{label_name}'" & f"annotator_id='{annotator_id}'").fetch("values")
                        if check.any():
                            print(f"    ... {label_name} already in database.")
                            continue
                        else:
                            print(f"    ... Adding patient {pat} session {sesh} label {label_name} to database.")
                            pass
                    except:
                        print(f"    ... Adding patient {pat} session {sesh} label {label_name} to database.")
                        pass

                    patient_pts = (MovieSession & f"patient_id={pat}" & f"session_nr={sesh}").fetch("pts")[0]
                    neural_rectime = (MovieSession & f"patient_id={pat}" & f"session_nr={sesh}").fetch("neural_recording_time")[0]

                    default_label = (MovieAnnotation & f"annotator_id='{annotator_id}'" & f"label_name='{label_name}'").fetch("indicator_function")[0]
                    patient_aligned_label = match_label_to_patient_pts_time(default_label, patient_pts)
                    values, starts, stops = create_vectors_from_time_points.get_start_stop_times_from_label(neural_rectime, 
                                                                                            patient_aligned_label)

                    self.insert1({'patient_id': pat,
                                    'session_nr': sesh,
                                    'annotator_id': annotator_id,
                                    'label_name': label_name,
                                    'annotation_date': annotation_date,
                                    'label_in_patient_time': np.array(patient_aligned_label),
                                    'values': np.array(values),
                                    'start_times': np.array(starts),
                                    'stop_times': np.array(stops),
                                    }, skip_duplicates=True)

make

make(key)

Align indicator to PTS and derive start/stop in neural time.

Source code in epiphyte/database/db_setup.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def make(self, key):
    """Align indicator to PTS and derive start/stop in neural time."""
    patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")
    entries = (MovieAnnotation).fetch('KEY')

    for i_pat, pat in enumerate(patient_ids):
        pat_sessions = session_nrs[i_pat]
        for i_sesh, sesh in enumerate([pat_sessions]):

            print(f"Patient {pat} session {sesh}..")

            for entry in entries:

                annotator_id = entry["annotator_id"]
                label_name = entry["label_name"]
                annotation_date = entry["annotation_date"]

                try:
                    check = (PatientAlignedMovieAnnotation & f"patient_id={pat}" & f"session_nr={sesh}"
                                & f"label_name='{label_name}'" & f"annotator_id='{annotator_id}'").fetch("values")
                    if check.any():
                        print(f"    ... {label_name} already in database.")
                        continue
                    else:
                        print(f"    ... Adding patient {pat} session {sesh} label {label_name} to database.")
                        pass
                except:
                    print(f"    ... Adding patient {pat} session {sesh} label {label_name} to database.")
                    pass

                patient_pts = (MovieSession & f"patient_id={pat}" & f"session_nr={sesh}").fetch("pts")[0]
                neural_rectime = (MovieSession & f"patient_id={pat}" & f"session_nr={sesh}").fetch("neural_recording_time")[0]

                default_label = (MovieAnnotation & f"annotator_id='{annotator_id}'" & f"label_name='{label_name}'").fetch("indicator_function")[0]
                patient_aligned_label = match_label_to_patient_pts_time(default_label, patient_pts)
                values, starts, stops = create_vectors_from_time_points.get_start_stop_times_from_label(neural_rectime, 
                                                                                        patient_aligned_label)

                self.insert1({'patient_id': pat,
                                'session_nr': sesh,
                                'annotator_id': annotator_id,
                                'label_name': label_name,
                                'annotation_date': annotation_date,
                                'label_in_patient_time': np.array(patient_aligned_label),
                                'values': np.array(values),
                                'start_times': np.array(starts),
                                'stop_times': np.array(stops),
                                }, skip_duplicates=True)

MovieSkips

Bases: Computed

Table containing information on segments of continuous vs. non-continuous movie watching.

Source code in epiphyte/database/db_setup.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
@epi_schema
class MovieSkips(dj.Computed):
    """Table containing information on segments of continuous vs. non-continuous movie watching."""
    definition = """
    # This table Contains start and stop time points, where the watching behaviour of the patient changed from 
    # continuous (watching the movie in the correct frame order) to non-continuous (e.g. jumping through the movie) or 
    # the other way round;
    # all time points are in Neural Recording Time
    -> MovieSession                    # number of movie session
    ---
    values: longblob                   # values of continuous watch segments
    start_times: longblob              # start time points of segments
    stop_times: longblob               # end time points of segments
    """

    def make(self, key):
        """Detect non-continuous segments (skips) from watchlogs and DAQ logs."""
        patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")

        for i_pat, pat in enumerate(patient_ids):
            pat_sessions = session_nrs[i_pat]

            for i_sesh, sesh in enumerate([pat_sessions]):

                try:
                    check = (MovieSkips & f"patient_id={pat}" & f"session_nr={sesh}").fetch("values")
                    if check.any():
                        continue
                    else:
                        print(f"    ... Adding patient {pat} session {sesh} to database.")
                        pass
                except:
                    print(f"    ... Adding patient {pat} session {sesh} to database.")
                    pass

                main_patient_dir = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}")

                session_info = np.load(main_patient_dir / "session_info.npy", allow_pickle=True)
                date = session_info.item().get("date")
                time = session_info.item().get("time")
                time = datetime.strptime(time, '%H-%M-%S').strftime('%H:%M.%S')

                path_wl =  main_patient_dir / "watchlogs" 
                ffplay_file = next(path_wl.glob("ffplay*"), None)

                if ffplay_file:
                    print(" Found ffplay file:", ffplay_file)
                else:
                    print(" No ffplay file found in the watchlogs directory.")
                    break

                path_daq = main_patient_dir / "daq_files" 
                daq_file = next(path_daq.glob("timedDAQ*"), None)

                if ffplay_file:
                    print(" Found DAQ file:", daq_file)
                else:
                    print(" No DAQ file found in the daq_files directory.")
                    break

                path_events = main_patient_dir / "event_file" / "Events.npy"
                time_conversion = data_utils.TimeConversion(path_to_wl=ffplay_file, path_to_dl=daq_file,
                                                                    path_to_events=path_events)
                starts, stops, values = time_conversion.convert_skips()

                self.insert1({'patient_id': pat, 
                            'session_nr': sesh,
                            'start_times': np.array(starts), 
                            'stop_times': np.array(stops), 
                            'values': np.array(values)}, skip_duplicates=True)

make

make(key)

Detect non-continuous segments (skips) from watchlogs and DAQ logs.

Source code in epiphyte/database/db_setup.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def make(self, key):
    """Detect non-continuous segments (skips) from watchlogs and DAQ logs."""
    patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")

    for i_pat, pat in enumerate(patient_ids):
        pat_sessions = session_nrs[i_pat]

        for i_sesh, sesh in enumerate([pat_sessions]):

            try:
                check = (MovieSkips & f"patient_id={pat}" & f"session_nr={sesh}").fetch("values")
                if check.any():
                    continue
                else:
                    print(f"    ... Adding patient {pat} session {sesh} to database.")
                    pass
            except:
                print(f"    ... Adding patient {pat} session {sesh} to database.")
                pass

            main_patient_dir = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}")

            session_info = np.load(main_patient_dir / "session_info.npy", allow_pickle=True)
            date = session_info.item().get("date")
            time = session_info.item().get("time")
            time = datetime.strptime(time, '%H-%M-%S').strftime('%H:%M.%S')

            path_wl =  main_patient_dir / "watchlogs" 
            ffplay_file = next(path_wl.glob("ffplay*"), None)

            if ffplay_file:
                print(" Found ffplay file:", ffplay_file)
            else:
                print(" No ffplay file found in the watchlogs directory.")
                break

            path_daq = main_patient_dir / "daq_files" 
            daq_file = next(path_daq.glob("timedDAQ*"), None)

            if ffplay_file:
                print(" Found DAQ file:", daq_file)
            else:
                print(" No DAQ file found in the daq_files directory.")
                break

            path_events = main_patient_dir / "event_file" / "Events.npy"
            time_conversion = data_utils.TimeConversion(path_to_wl=ffplay_file, path_to_dl=daq_file,
                                                                path_to_events=path_events)
            starts, stops, values = time_conversion.convert_skips()

            self.insert1({'patient_id': pat, 
                        'session_nr': sesh,
                        'start_times': np.array(starts), 
                        'stop_times': np.array(stops), 
                        'values': np.array(values)}, skip_duplicates=True)

MoviePauses

Bases: Computed

Table containing information on pauses in movie playback detected from watchlogs and DAQ logs.

Source code in epiphyte/database/db_setup.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
@epi_schema
class MoviePauses(dj.Computed):
    """Table containing information on pauses in movie playback detected from watchlogs and DAQ logs."""
    definition = """
    # This table contains information about pauses in movie playback;
    # This is directly computed from the watch log;
    # Time points are in Neural Recording Time
    -> MovieSession                    # movie watching session of patient
    ---
    start_times: longblob              # start time points of pauses
    stop_times: longblob               # end time points of pauses
    """

    def make(self, key):

        patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")

        for i_pat, pat in enumerate(patient_ids):
            pat_sessions = session_nrs[i_pat]

            for i_sesh, sesh in enumerate([pat_sessions]):

                try:
                    check = (MoviePauses & f"patient_id={pat}" & f"session_nr={sesh}").fetch("values")
                    if check.any():
                        continue
                    else:
                        print(f"    ... Adding patient {pat} session {sesh} to database.")
                        pass
                except:
                    print(f"    ... Adding patient {pat} session {sesh} to database.")
                    pass

                main_patient_dir = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}")

                session_info = np.load(main_patient_dir / "session_info.npy", allow_pickle=True)
                date = session_info.item().get("date")
                time = session_info.item().get("time")
                time = datetime.strptime(time, '%H-%M-%S').strftime('%H:%M.%S')

                path_wl =  main_patient_dir / "watchlogs" 
                ffplay_file = next(path_wl.glob("ffplay*"), None)

                if ffplay_file:
                    print(" Found ffplay file:", ffplay_file)
                else:
                    print(" No ffplay file found in the watchlogs directory.")
                    break

                path_daq = main_patient_dir / "daq_files" 
                daq_file = next(path_daq.glob("timedDAQ*"), None)

                if ffplay_file:
                    print(" Found DAQ file:", daq_file)
                else:
                    print(" No DAQ file found in the daq_files directory.")
                    break

                path_events = main_patient_dir / "event_file" / "Events.npy"
                time_conversion = data_utils.TimeConversion(path_to_wl=ffplay_file, path_to_dl=daq_file,
                                                                    path_to_events=path_events)

                starts, stops = time_conversion.convert_pauses()

                self.insert1({'patient_id': pat,
                            'session_nr': sesh,
                            'start_times': np.array(starts),
                            'stop_times': np.array(stops)}, skip_duplicates=True)

populate_lfp_data_table

populate_lfp_data_table()

Populate LFPData from lfp_data files under each session directory.

Skips already-inserted channel entries.

Source code in epiphyte/database/db_setup.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
def populate_lfp_data_table() -> None:
    """Populate ``LFPData`` from ``lfp_data`` files under each session directory.

    Skips already-inserted channel entries.
    """

    patient_ids, session_nrs = MovieSession.fetch("patient_id", "session_nr")

    for i_pat, pat in enumerate(patient_ids):
        pat_sessions = session_nrs[i_pat]

        for i_sesh, sesh in enumerate(pat_sessions):

            path_ds_dir = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}", "lfp_data")
            lfp_files = list(path_ds_dir.glob("CSC*"))

            try:
                check = (LFPData & f"patient_id={pat}" & f"session_nr={sesh}").fetch("csc_nr")[0]
                if len(check) == len(lfp_files):
                    print(f"    Patient {pat} session {sesh} already added.")
                    continue
                else:
                    print(f"    Adding patient {pat} session {sesh} to database...")
                    pass
            except:
                print(f"    Adding patient {pat} session {sesh} to database...")
                pass

            path_channels = Path(config.PATH_TO_PATIENT_DATA, str(pat), f"session_{sesh}", "ChannelNames.txt")
            channel_names = helpers.get_channel_names(path_channels)

            for ds_file in path_ds_dir.iterdir():

                csc_nr = ds_file.name.split('_')[0][3:]
                region = channel_names[int(csc_nr)-1]
                print(f"  .. adding csc {csc_nr}..")
                ds_dict = np.load(ds_file, allow_pickle=True)
                LFPData.insert1({
                    'patient_id': pat,
                    'session_nr': sesh,
                    'csc_nr': csc_nr,
                    'samples': ds_dict.item().get("samples"),
                    'timestamps': ds_dict.item().get("timestamps"),
                    'sample_rate': ds_dict.item().get("sample_rate")[0],
                    'brain_region': region
                })


                print(f"  .. csc {csc_nr} added.")

helpers

Helper functions used in the database module.

This module provides small utilities for parsing filenames, sorting keys in a human-friendly way, and extracting metadata encoded in strings.

atoi

atoi(text)

Convert a numeric substring to int or return the original string.

Parameters:

Name Type Description Default
text str

Substring that may contain only digits.

required

Returns:

Type Description
Union[int, str]

Union[int, str]: Integer value (if all digits) or the original string.

Source code in epiphyte/database/helpers.py
16
17
18
19
20
21
22
23
24
25
26
def atoi(text: str) -> Union[int, str]:
    """Convert a numeric substring to ``int`` or return the original string.

    Args:
        text (str): Substring that may contain only digits.

    Returns:
        Union[int, str]: Integer value (if all digits) or the original string.
    """

    return int(text) if text.isdigit() else text

natural_keys

natural_keys(text)

Split a string into chunks for human (natural) sorting.

Use as alist.sort(key=natural_keys) to sort filenames such as CSC2_SU1.npy before CSC10_SU1.npy.

Notes

Based on Ned Batchelder's human sorting recipe.

Parameters:

Name Type Description Default
text str

Input string to split into text and integer chunks.

required

Returns:

Type Description
List[Union[int, str]]

List[Union[int, str]]: Alternating text and integer parts suitable as a sort key.

Source code in epiphyte/database/helpers.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def natural_keys(text: str) -> List[Union[int, str]]:
    """Split a string into chunks for human (natural) sorting.

    Use as ``alist.sort(key=natural_keys)`` to sort filenames such as
    ``CSC2_SU1.npy`` before ``CSC10_SU1.npy``.

    Notes:
        Based on Ned Batchelder's human sorting recipe.

    Args:
        text (str): Input string to split into text and integer chunks.

    Returns:
        List[Union[int, str]]: Alternating text and integer parts suitable as a sort key.
    """

    return [atoi(chunk) for chunk in re.split(r"(\d+)", text)]

extract_sort_key

extract_sort_key(filename)

Extract a sortable key from a spike filename.

Filenames are expected to follow CSC<nr>_<type><nr>.npy. If the pattern matches, returns a tuple (csc_number, unit_type, unit_nr). Otherwise, returns the original filename for fallback sorting.

Parameters:

Name Type Description Default
filename str

Filename to parse.

required

Returns:

Type Description
Union[Tuple[int, str, int], str]

Union[Tuple[int, str, int], str]: Tuple for sorting or the original filename.

Source code in epiphyte/database/helpers.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def extract_sort_key(filename: str) -> Union[Tuple[int, str, int], str]:
    """Extract a sortable key from a spike filename.

    Filenames are expected to follow ``CSC<nr>_<type><nr>.npy``. If the
    pattern matches, returns a tuple ``(csc_number, unit_type, unit_nr)``.
    Otherwise, returns the original filename for fallback sorting.

    Args:
        filename (str): Filename to parse.

    Returns:
        Union[Tuple[int, str, int], str]: Tuple for sorting or the original filename.
    """

    match = re.match(r"CSC(\d+)_(\w+)(\d*)\.npy", filename)
    if match:
        csc_number = int(match.group(1))
        mu_su = match.group(2)
        mu_su_number = int(match.group(3)) if match.group(3) else 0
        return csc_number, mu_su, mu_su_number
    return filename

get_channel_names

get_channel_names(path_channel_names)

Read channel names (without extensions) from a text file.

The file is expected to contain lines like <name>.ncs. The suffix is stripped to yield bare channel identifiers.

Parameters:

Name Type Description Default
path_channel_names Union[str, Path]

Path to the channel names file.

required

Returns:

Type Description
List[str]

List[str]: List of channel name strings.

Source code in epiphyte/database/helpers.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_channel_names(path_channel_names: Union[str, Path]) -> List[str]:
    """Read channel names (without extensions) from a text file.

    The file is expected to contain lines like ``<name>.ncs``. The suffix is
    stripped to yield bare channel identifiers.

    Args:
        path_channel_names (Union[str, Path]): Path to the channel names file.

    Returns:
        List[str]: List of channel name strings.
    """

    channel_names: List[str] = []
    with open(path_channel_names, "r") as handle:
        for line in handle:
            # Strip trailing "\n" and ".ncs" (5 chars), keep the base name only.
            channel_names.append(line[:-5 - 1])
    return channel_names

get_unit_type_and_number

get_unit_type_and_number(unit_string)

Parse a unit string into unit type and number.

Example: CSC_MUA1 -> ("M", "1").

Parameters:

Name Type Description Default
unit_string str

Original unit string (e.g., "MUA1" or "SU3").

required

Returns:

Type Description
Tuple[str, str]

Tuple[str, str]: Tuple (unit_type, unit_nr) where type is "M", "S", or "X".

Source code in epiphyte/database/helpers.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get_unit_type_and_number(unit_string: str) -> Tuple[str, str]:
    """Parse a unit string into unit type and number.

    Example: ``CSC_MUA1`` -> ("M", "1").

    Args:
        unit_string (str): Original unit string (e.g., ``"MUA1"`` or ``"SU3"``).

    Returns:
        Tuple[str, str]: Tuple ``(unit_type, unit_nr)`` where type is ``"M"``, ``"S"``, or ``"X"``.
    """

    if "MU" in unit_string:
        unit_type = "M"
    elif "SU" in unit_string:
        unit_type = "S"
    else:
        unit_type = "X"
    unit_nr = unit_string[-1]
    return unit_type, unit_nr

extract_name_unit_id_from_unit_level_data_cleaning

extract_name_unit_id_from_unit_level_data_cleaning(filename)

Split a unit-level cleaning filename into components.

Filenames are expected as "<name>_unit<id>_<annotator>.npy".

Parameters:

Name Type Description Default
filename str

Filename to parse.

required

Returns:

Type Description
Tuple[str, str, str]

Tuple[str, str, str]: Tuple (name, unit_id, annotator).

Source code in epiphyte/database/helpers.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def extract_name_unit_id_from_unit_level_data_cleaning(
    filename: str,
) -> Tuple[str, str, str]:
    """Split a unit-level cleaning filename into components.

    Filenames are expected as ``"<name>_unit<id>_<annotator>.npy"``.

    Args:
        filename (str): Filename to parse.

    Returns:
        Tuple[str, str, str]: Tuple ``(name, unit_id, annotator)``.
    """

    name, unit_id, annotator = filename.split("_")
    unit_id = unit_id[4:]
    annotator = annotator[:-4]
    return name, unit_id, annotator

match_label_to_patient_pts_time

match_label_to_patient_pts_time(default_label, patient_pts)

Align a default label indicator function to patient PTS frames.

Parameters:

Name Type Description Default
default_label ndarray

Indicator vector (per canonical frame) of shape (N,).

required
patient_pts ndarray

Watched frame times in seconds, rounded to 2 decimals.

required

Returns:

Type Description
List[int]

List[int]: Indicator value for each patient frame.

Source code in epiphyte/database/helpers.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def match_label_to_patient_pts_time(
    default_label: np.ndarray, patient_pts: np.ndarray
) -> List[int]:
    """Align a default label indicator function to patient PTS frames.

    Args:
        default_label (np.ndarray): Indicator vector (per canonical frame) of shape ``(N,)``.
        patient_pts (np.ndarray): Watched frame times in seconds, rounded to 2 decimals.

    Returns:
        List[int]: Indicator value for each patient frame.
    """

    return [
        default_label[int(np.round(frame / 0.04, 0)) - 1]
        for _, frame in enumerate(patient_pts)
    ]

get_list_of_patient_ids

get_list_of_patient_ids(patient_dict)

Collect all patient IDs from an indexable sequence of dicts.

Parameters:

Name Type Description Default
patient_dict Sequence[Dict[str, Any]]

Sequence where each item has a patient_id key.

required

Returns:

Type Description
List[int]

List[int]: List of integer patient identifiers.

Source code in epiphyte/database/helpers.py
153
154
155
156
157
158
159
160
161
162
163
def get_list_of_patient_ids(patient_dict: Sequence[Dict[str, Any]]) -> List[int]:
    """Collect all patient IDs from an indexable sequence of dicts.

    Args:
        patient_dict (Sequence[Dict[str, Any]]): Sequence where each item has a `patient_id` key.

    Returns:
        List[int]: List of integer patient identifiers.
    """

    return [patient_dict[i]["patient_id"] for i in range(0, len(patient_dict))]

query_functions