-
Notifications
You must be signed in to change notification settings - Fork 247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proof of concept of "buffer_description_api" and xarray reference API bridge #1513
base: master
Are you sure you want to change the base?
Changes from all commits
3083513
e70c014
24fe98e
22c6698
38a84f6
4d31b75
5506b43
7a0de15
a53d147
907fa25
f6ea345
81c7121
2d21e48
a64b054
bc5a122
356b281
cf612df
cb6ba8d
4714535
e9836d3
334a882
383ada7
9c35752
672e8fe
a540840
282951a
401956a
2fa19fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,7 @@ | |
import numpy as np | ||
|
||
from .baserawio import ( | ||
BaseRawIO, | ||
BaseRawWithBufferApiIO, | ||
_signal_channel_dtype, | ||
_signal_stream_dtype, | ||
_signal_buffer_dtype, | ||
|
@@ -63,7 +63,7 @@ | |
from neo.core import NeoReadWriteError | ||
|
||
|
||
class AxonRawIO(BaseRawIO): | ||
class AxonRawIO(BaseRawWithBufferApiIO): | ||
""" | ||
Class for Class for reading data from pCLAMP and AxoScope files (.abf version 1 and 2) | ||
|
||
|
@@ -92,7 +92,7 @@ class AxonRawIO(BaseRawIO): | |
rawmode = "one-file" | ||
|
||
def __init__(self, filename=""): | ||
BaseRawIO.__init__(self) | ||
BaseRawWithBufferApiIO.__init__(self) | ||
self.filename = filename | ||
|
||
def _parse_header(self): | ||
|
@@ -115,8 +115,6 @@ def _parse_header(self): | |
head_offset = info["sections"]["DataSection"]["uBlockIndex"] * BLOCKSIZE | ||
totalsize = info["sections"]["DataSection"]["llNumEntries"] | ||
|
||
self._raw_data = np.memmap(self.filename, dtype=sig_dtype, mode="r", shape=(totalsize,), offset=head_offset) | ||
|
||
# 3 possible modes | ||
if version < 2.0: | ||
mode = info["nOperationMode"] | ||
|
@@ -142,7 +140,7 @@ def _parse_header(self): | |
) | ||
else: | ||
episode_array = np.empty(1, [("offset", "i4"), ("len", "i4")]) | ||
episode_array[0]["len"] = self._raw_data.size | ||
episode_array[0]["len"] = totalsize | ||
episode_array[0]["offset"] = 0 | ||
|
||
# sampling_rate | ||
|
@@ -154,9 +152,14 @@ def _parse_header(self): | |
# one sweep = one segment | ||
nb_segment = episode_array.size | ||
|
||
stream_id = "0" | ||
buffer_id = "0" | ||
|
||
# Get raw data by segment | ||
self._raw_signals = {} | ||
# self._raw_signals = {} | ||
self._t_starts = {} | ||
self._buffer_descriptions = {0 :{}} | ||
self._stream_buffer_slice = {stream_id : None} | ||
pos = 0 | ||
for seg_index in range(nb_segment): | ||
length = episode_array[seg_index]["len"] | ||
|
@@ -169,7 +172,15 @@ def _parse_header(self): | |
if (fSynchTimeUnit != 0) and (mode == 1): | ||
length /= fSynchTimeUnit | ||
|
||
self._raw_signals[seg_index] = self._raw_data[pos : pos + length].reshape(-1, nbchannel) | ||
self._buffer_descriptions[0][seg_index] = {} | ||
self._buffer_descriptions[0][seg_index][buffer_id] = { | ||
"type" : "raw", | ||
"file_path" : str(self.filename), | ||
"dtype" : str(sig_dtype), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this for jsonification? rather than store a np.dtype()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a data layout is split with a structure theorically we could handle it in zarr giving the list of chunks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. str(sig_dtype) is for jsonification yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some without this safety step. So if we really want to be safe we should do this for all uses of dtype and Path no? |
||
"order": "C", | ||
"file_offset" : head_offset + pos * sig_dtype.itemsize, | ||
"shape" : (int(length // nbchannel), int(nbchannel)), | ||
} | ||
pos += length | ||
|
||
t_start = float(episode_array[seg_index]["offset"]) | ||
|
@@ -227,17 +238,14 @@ def _parse_header(self): | |
offset -= info["listADCInfo"][chan_id]["fSignalOffset"] | ||
else: | ||
gain, offset = 1.0, 0.0 | ||
stream_id = "0" | ||
buffer_id = "0" | ||
signal_channels.append( | ||
(name, str(chan_id), self._sampling_rate, sig_dtype, units, gain, offset, stream_id, buffer_id) | ||
) | ||
|
||
signal_channels.append((name, str(chan_id), self._sampling_rate, sig_dtype, units, gain, offset, stream_id, buffer_id)) | ||
|
||
signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype) | ||
|
||
# one unique signal stream and buffer | ||
signal_buffers = np.array([("Signals", "0")], dtype=_signal_buffer_dtype) | ||
signal_streams = np.array([("Signals", "0", "0")], dtype=_signal_stream_dtype) | ||
signal_buffers = np.array([("Signals", buffer_id)], dtype=_signal_buffer_dtype) | ||
signal_streams = np.array([("Signals", stream_id, buffer_id)], dtype=_signal_stream_dtype) | ||
|
||
# only one events channel : tag | ||
# In ABF timstamps are not attached too any particular segment | ||
|
@@ -295,21 +303,26 @@ def _segment_t_start(self, block_index, seg_index): | |
return self._t_starts[seg_index] | ||
|
||
def _segment_t_stop(self, block_index, seg_index): | ||
t_stop = self._t_starts[seg_index] + self._raw_signals[seg_index].shape[0] / self._sampling_rate | ||
sig_size = self.get_signal_size(block_index, seg_index, 0) | ||
t_stop = self._t_starts[seg_index] + sig_size / self._sampling_rate | ||
return t_stop | ||
|
||
def _get_signal_size(self, block_index, seg_index, stream_index): | ||
shape = self._raw_signals[seg_index].shape | ||
return shape[0] | ||
# def _get_signal_size(self, block_index, seg_index, stream_index): | ||
# shape = self._raw_signals[seg_index].shape | ||
# return shape[0] | ||
|
||
def _get_signal_t_start(self, block_index, seg_index, stream_index): | ||
return self._t_starts[seg_index] | ||
|
||
def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes): | ||
if channel_indexes is None: | ||
channel_indexes = slice(None) | ||
raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes] | ||
return raw_signals | ||
# def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes): | ||
# if channel_indexes is None: | ||
# channel_indexes = slice(None) | ||
# raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes] | ||
# return raw_signals | ||
|
||
def _get_analogsignal_buffer_description(self, block_index, seg_index, buffer_id): | ||
return self._buffer_descriptions[block_index][seg_index][buffer_id] | ||
|
||
|
||
def _event_count(self, block_index, seg_index, event_channel_index): | ||
return self._raw_ev_timestamps.size | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,8 @@ | |
|
||
from neo import logging_handler | ||
|
||
from .utils import get_memmap_chunk_from_opened_file | ||
|
||
|
||
possible_raw_modes = [ | ||
"one-file", | ||
|
@@ -182,6 +184,15 @@ def __init__(self, use_cache: bool = False, cache_path: str = "same_as_resource" | |
self.header = None | ||
self.is_header_parsed = False | ||
|
||
self._has_buffer_description_api = False | ||
|
||
def has_buffer_description_api(self) -> bool: | ||
""" | ||
Return if the reader handle the buffer API. | ||
If True then the reader support internally `get_analogsignal_buffer_description()` | ||
""" | ||
return self._has_buffer_description_api | ||
|
||
def parse_header(self): | ||
""" | ||
Parses the header of the file(s) to allow for faster computations | ||
|
@@ -191,6 +202,7 @@ def parse_header(self): | |
# this must create | ||
# self.header['nb_block'] | ||
# self.header['nb_segment'] | ||
# self.header['signal_buffers'] | ||
# self.header['signal_streams'] | ||
# self.header['signal_channels'] | ||
# self.header['spike_channels'] | ||
|
@@ -663,6 +675,7 @@ def get_signal_size(self, block_index: int, seg_index: int, stream_index: int | | |
|
||
""" | ||
stream_index = self._get_stream_index_from_arg(stream_index) | ||
|
||
return self._get_signal_size(block_index, seg_index, stream_index) | ||
|
||
def get_signal_t_start(self, block_index: int, seg_index: int, stream_index: int | None = None): | ||
|
@@ -1311,7 +1324,6 @@ def _get_analogsignal_chunk( | |
------- | ||
array of samples, with each requested channel in a column | ||
""" | ||
|
||
raise (NotImplementedError) | ||
|
||
### | ||
|
@@ -1350,6 +1362,150 @@ def _rescale_event_timestamp(self, event_timestamps: np.ndarray, dtype: np.dtype | |
def _rescale_epoch_duration(self, raw_duration: np.ndarray, dtype: np.dtype): | ||
raise (NotImplementedError) | ||
|
||
### | ||
# buffer api zone | ||
# must be implemented if has_buffer_description_api=True | ||
def get_analogsignal_buffer_description(self, block_index: int = 0, seg_index: int = 0, buffer_id: str = None): | ||
if not self.has_buffer_description_api: | ||
raise ValueError("This reader do not support buffer_description API") | ||
descr = self._get_analogsignal_buffer_description(block_index, seg_index, buffer_id) | ||
return descr | ||
|
||
def _get_analogsignal_buffer_description(self, block_index, seg_index, buffer_id): | ||
raise (NotImplementedError) | ||
|
||
|
||
|
||
class BaseRawWithBufferApiIO(BaseRawIO): | ||
""" | ||
Generic class for reader that support "buffer api". | ||
|
||
In short reader that are internally based on: | ||
|
||
* np.memmap | ||
* hdf5 | ||
|
||
In theses cases _get_signal_size and _get_analogsignal_chunk are totaly generic and do not need to be implemented in the class. | ||
|
||
For this class sub classes must implements theses two dict: | ||
* self._buffer_descriptions[block_index][seg_index] = buffer_description | ||
* self._stream_buffer_slice[buffer_id] = None or slicer o indices | ||
|
||
""" | ||
|
||
def __init__(self, *arg, **kwargs): | ||
super().__init__(*arg, **kwargs) | ||
self._has_buffer_description_api = True | ||
|
||
def _get_signal_size(self, block_index, seg_index, stream_index): | ||
buffer_id = self.header["signal_streams"][stream_index]["buffer_id"] | ||
buffer_desc = self.get_analogsignal_buffer_description(block_index, seg_index, buffer_id) | ||
# some hdf5 revert teh buffer | ||
time_axis = buffer_desc.get("time_axis", 0) | ||
return buffer_desc['shape'][time_axis] | ||
|
||
def _get_analogsignal_chunk( | ||
self, | ||
block_index: int, | ||
seg_index: int, | ||
i_start: int | None, | ||
i_stop: int | None, | ||
stream_index: int, | ||
channel_indexes: list[int] | None, | ||
): | ||
|
||
stream_id = self.header["signal_streams"][stream_index]["id"] | ||
buffer_id = self.header["signal_streams"][stream_index]["buffer_id"] | ||
|
||
buffer_slice = self._stream_buffer_slice[stream_id] | ||
|
||
|
||
buffer_desc = self.get_analogsignal_buffer_description(block_index, seg_index, buffer_id) | ||
|
||
i_start = i_start or 0 | ||
i_stop = i_stop or buffer_desc['shape'][0] | ||
|
||
if buffer_desc['type'] == "raw": | ||
|
||
# open files on demand and keep reference to opened file | ||
if not hasattr(self, '_memmap_analogsignal_buffers'): | ||
self._memmap_analogsignal_buffers = {} | ||
if block_index not in self._memmap_analogsignal_buffers: | ||
self._memmap_analogsignal_buffers[block_index] = {} | ||
if seg_index not in self._memmap_analogsignal_buffers[block_index]: | ||
self._memmap_analogsignal_buffers[block_index][seg_index] = {} | ||
if buffer_id not in self._memmap_analogsignal_buffers[block_index][seg_index]: | ||
fid = open(buffer_desc['file_path'], mode='rb') | ||
self._memmap_analogsignal_buffers[block_index][seg_index][buffer_id] = fid | ||
else: | ||
fid = self._memmap_analogsignal_buffers[block_index][seg_index][buffer_id] | ||
|
||
num_channels = buffer_desc['shape'][1] | ||
|
||
raw_sigs = get_memmap_chunk_from_opened_file(fid, num_channels, i_start, i_stop, np.dtype(buffer_desc['dtype']), file_offset=buffer_desc['file_offset']) | ||
|
||
|
||
elif buffer_desc['type'] == 'hdf5': | ||
|
||
# open files on demand and keep reference to opened file | ||
if not hasattr(self, '_hdf5_analogsignal_buffers'): | ||
self._hdf5_analogsignal_buffers = {} | ||
if block_index not in self._hdf5_analogsignal_buffers: | ||
self._hdf5_analogsignal_buffers[block_index] = {} | ||
if seg_index not in self._hdf5_analogsignal_buffers[block_index]: | ||
self._hdf5_analogsignal_buffers[block_index][seg_index] = {} | ||
if buffer_id not in self._hdf5_analogsignal_buffers[block_index][seg_index]: | ||
import h5py | ||
h5file = h5py.File(buffer_desc['file_path'], mode="r") | ||
self._hdf5_analogsignal_buffers[block_index][seg_index][buffer_id] = h5file | ||
else: | ||
h5file = self._hdf5_analogsignal_buffers[block_index][seg_index][buffer_id] | ||
|
||
hdf5_path = buffer_desc["hdf5_path"] | ||
full_raw_sigs = h5file[hdf5_path] | ||
|
||
time_axis = buffer_desc.get("time_axis", 0) | ||
if time_axis == 0: | ||
raw_sigs = full_raw_sigs[i_start:i_stop, :] | ||
elif time_axis == 1: | ||
raw_sigs = full_raw_sigs[:, i_start:i_stop].T | ||
else: | ||
raise RuntimeError("Should never happen") | ||
|
||
if buffer_slice is not None: | ||
raw_sigs = raw_sigs[:, buffer_slice] | ||
|
||
|
||
|
||
else: | ||
raise NotImplementedError() | ||
|
||
# this is a pre slicing when the stream do not contain all channels (for instance spikeglx when load_sync_channel=False) | ||
if buffer_slice is not None: | ||
raw_sigs = raw_sigs[:, buffer_slice] | ||
|
||
# channel slice requested | ||
if channel_indexes is not None: | ||
raw_sigs = raw_sigs[:, channel_indexes] | ||
|
||
|
||
return raw_sigs | ||
|
||
def __del__(self): | ||
if hasattr(self, '_memmap_analogsignal_buffers'): | ||
for block_index in self._memmap_analogsignal_buffers.keys(): | ||
for seg_index in self._memmap_analogsignal_buffers[block_index].keys(): | ||
for buffer_id, fid in self._memmap_analogsignal_buffers[block_index][seg_index].items(): | ||
fid.close() | ||
del self._memmap_analogsignal_buffers | ||
|
||
if hasattr(self, '_hdf5_analogsignal_buffers'): | ||
for block_index in self._hdf5_analogsignal_buffers.keys(): | ||
for seg_index in self._hdf5_analogsignal_buffers[block_index].keys(): | ||
for buffer_id, h5_file in self._hdf5_analogsignal_buffers[block_index][seg_index].items(): | ||
h5_file.close() | ||
del self._hdf5_analogsignal_buffers | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no del here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oups yes. |
||
|
||
def pprint_vector(vector, lim: int = 8): | ||
vector = np.asarray(vector) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have logical channels, this should be same units as well, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment not yet. The API do not enforce and ensure this.
See this https://github.com/NeuralEnsemble/python-neo/blob/master/neo/rawio/baserawio.py#L118
Ideally units should be add but some IO are mixing maybe the units in the same stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add it as an ideal? Or would you rather first do the changes and then change it here?
I can close this for example:
#1133