diff --git a/.coverage b/.coverage deleted file mode 100644 index 1c92ca5..0000000 --- a/.coverage +++ /dev/null @@ -1 +0,0 @@ -!coverage.py: This is a private format, don't read it directly!{"lines":{"C:\\Users\\nd269\\Desktop\\pyUnisens\\test\\unisens_test.py":[6,7,8,9,10,12,13,14,15,20,39,42,45,48,76,92,152,181,298,314,365,385,434,484,485,43,93,94,96,97,99,100,101,102,103,104,106,107,108,109,110,111,113,114,115,117,118,121,122,123,124,125,126,127,128,129,130,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,46,299,300,301,303,304,305,306,307,308,309,21,24,27,30,33,36,310,311,34,35,315,316,317,318,319,320,321,322,323,324,325,326,327,329,330,331,332,333,334,335,336,337,338,340,341,342,343,344,346,347,348,349,351,352,353,354,356,357,358,359,360,361,184,186,187,188,189,190,191,192,193,194,196,199,201,202,204,205,206,207,209,210,211,212,213,214,215,216,217,218,219,220,221,224,225,226,227,228,229,230,231,232,233,234,235,236,238,239,240,241,242,243,244,245,247,248,249,250,251,252,253,254,255,256,258,259,260,261,262,263,264,265,266,267,269,270,271,272,273,274,275,276,277,278,279,280,281,283,284,285,286,288,289,290,291,292,293,294,386,388,390,391,392,393,394,395,396,397,398,399,400,401,405,406,407,408,409,410,411,412,413,414,415,416,417,420,421,422,423,424,425,426,427,428,429,430,431,432,366,368,369,370,371,372,373,374,375,376,377,378,379,381,382,435,437,439,440,441,442,443,444,445,446,447,448,449,450,454,455,456,457,458,459,460,461,462,463,464,465,466,469,470,471,472,473,474,475,476,477,478,479,480,481,153,154,156,157,158,159,160,161,163,164,165,167,168,170,171,173,174,175,176,177,178,49,51,52,53,54,56,57,58,60,61,62,63,65,66,67,69,70,71,73,77,78,79,80,81,83,84,85,87,88,89],"C:\\Users\\nd269\\Desktop\\pyUnisens\\unisens\\__init__.py":[1,2],"C:\\Users\\nd269\\Desktop\\pyUnisens\\unisens\\entry.py":[6,7,8,9,10,11,12,15,17,20,23,26,35,48,76,97,120,134,143,149,154,156,160,176,178,182,207,279,281,298,325,363,364,368,369,373,375,378,397,398,403,415,422,423,429,430,434,435,376,161,27,28,40,41,43,44,29,30,31,32,162,166,172,167,168,169,170,113,114,115,116,117,118,173,45,126,127,128,131,365,282,284,290,291,293,424,425,426,294,295,296,52,53,54,59,60,65,72,73,179,370,18,399,400,401,416,419,420,163,164,66,69,70,67,285,286,287,288,55,56,77,78,79,80,81,83,84,89,135,136,137,138,139,404,405,406,407,408,409,410,411,140,412,413,165,194,195,196,197,198,199,200,201,202,333,334,335,340,343,347,349,350,351,352,353,354,355,356,336,337,338,360,341,342,344,345,346,386,387,388,394,389,390,391,299,301,303,304,307,310,311,312,313,314,315,316,318,319,321,322,323,308,225,228,235,236,238,242,251,254,255,256,427,257,263,266,267,268,270,271,272,273,274,275],"C:\\Users\\nd269\\Desktop\\pyUnisens\\unisens\\utils.py":[8,10,16,18,22,24,31,55,66,61,62,64,71,73,25,26,72,46,44,45,47,50,51,52,53],"C:\\Users\\nd269\\Desktop\\pyUnisens\\unisens\\main.py":[34,37,38,39,40,41,42,43,44,45,46,50,68,80,82,124,131,134,142,150,165,168,185,221,241,273,95,96,97,129,98,99,100,101,103,104,105,106,250,251,252,255,258,260,261,193,194,195,196,216,217,197,200,202,205,207,210,211,219,218,262,175,176,181,182,263,264,201,177,132,180,198,199,203,204,206,265,266,267,268,269,230,232,233,234,235,236,237,238,213,214,215,135,136,137,138,108,109,110,111,112,113,114,115,116,117,118,178,179,231]}} \ No newline at end of file diff --git a/.github/workflows/Tests.yml b/.github/workflows/Tests.yml index dd5fd4a..3eb0868 100644 --- a/.github/workflows/Tests.yml +++ b/.github/workflows/Tests.yml @@ -15,13 +15,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.6, 3.7, 3.8] + python-version: [3.7, 3.8, 3.9, 3.11] os: [ubuntu-latest, macos-latest, windows-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v3 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml new file mode 100644 index 0000000..a310d15 --- /dev/null +++ b/.github/workflows/testing.yml @@ -0,0 +1,37 @@ +# This workflow will install Python dependencies, run tests with Python versions 3.8 and 3.11 + +name: Testing + +on: + [push] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.8", "3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install pytest coverage # flake8 + pip install -r test-requirements.txt + pip install -r requirements.txt + - name: Test with pytest + run: | + coverage run -m pytest + coverage report -m --skip-covered --include=./unisens/* --precision=2 + - name: Test with unittest + run: | + coverage run -m unittest discover -p '*_test.py' + coverage report -m --skip-covered --include=./unisens/* --precision=2 + diff --git a/.gitignore b/.gitignore index dd267b8..f3b401b 100644 --- a/.gitignore +++ b/.gitignore @@ -82,6 +82,10 @@ target/ profile_default/ ipython_config.py +# IntelliJ IDEA +.idea/ +*.pyc + # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: @@ -133,6 +137,8 @@ dmypy.json # pytype static type analyzer .pytype/ -/test/Example_001/test.xml -/test/Example_002/test.xml -/test/Example_003/test.xml +/test/Example_001/ +/test/Example_002/ +/test/Example_003/ + +*tmp/ diff --git a/API-OVERVIEW.md b/API-OVERVIEW.md index bf92a54..86101c9 100644 --- a/API-OVERVIEW.md +++ b/API-OVERVIEW.md @@ -118,7 +118,7 @@ u = unisens.Unisens('c:/unisens', makenew=True, autosave=True, readonly=False) ## SignalEntry -SignalEntries can be used to store continuous numeric data with high frequency, e.g. ECG signals. They are saved in binary format. It is possible to save multiple channels. Things like sample frequency and other meta information can be saved in them as well. Data must be of size `[1, N]`. +SignalEntries can be used to store continuous numeric data with high frequency, e.g. ECG signals. They are saved in binary or csv format. It is possible to save multiple channels. Things like sample frequency and other meta information can be saved in them as well. Data must be of size `[1, N]`. ```Python from unisens import Unisens, SignalEntry @@ -143,7 +143,7 @@ u.save() ``` ## ValuesEntry -`ValuesEntry` is used for low-frequency continuously sampled data, e.g. Temperature or RR intervals. It is basically equivalent to `SignalEntry` except that it saves data in CSV (text) format, and not binary. Data must be of size `[1, N]`. +`ValuesEntry` is used for low-frequency continuously sampled data, e.g. Temperature or RR intervals. It is basically equivalent to `SignalEntry` except that it saves data in CSV (text) format, and not binary. Data must be of size `[N, 1]`, i.e. column-wise, with indices in the first column. The integer indices are matched with the sample rate and the unisens timestamp start to display correctly in the Un isensViewer. ```Python from unisens import Unisens, ValuesEntry diff --git a/setup.py b/setup.py index 48c92bb..74934e2 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup(name='pyunisens', python_requires='>=3.6', - version='1.03', + version='1.5.0', description='A python implementation of the Unisens standard', url='http://github.com/Unisens/pyUnisens', author='skjerns', diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unisens_test.py b/test/unisens_test.py index 9eff145..b97ffa2 100644 --- a/test/unisens_test.py +++ b/test/unisens_test.py @@ -5,18 +5,20 @@ @author: skjerns """ import os +import warnings + +import pytest + from unisens import CustomEntry, ValuesEntry, EventEntry, SignalEntry from unisens import MiscEntry, CustomAttributes, Unisens, FileEntry -from unisens import CsvFileEntry +from unisens import make_key import unittest import shutil -import tempfile import numpy as np import pickle - def elements_equal(e1, e2): assert e1.tag == e2.tag, f'tag not same {e1.tag}!={e2.tag}' assert e1.text == e2.text, f'text not same {e1.text}!={e2.text}' @@ -27,65 +29,65 @@ def elements_equal(e1, e2): class Testing(unittest.TestCase): - - - def setUp(self): - self.tmpdir = tempfile.mkdtemp(prefix='unisens') - - def tearDown(self): - shutil.rmtree(self.tmpdir) - + tmpdir = os.path.join(os.path.dirname(__file__), 'tmp') + + @classmethod + def setUp(cls): + os.makedirs(cls.tmpdir, exist_ok=True) + + @classmethod + def tearDown(cls): + shutil.rmtree(cls.tmpdir) + def test_unisens_creation(self): folder = os.path.join(self.tmpdir, 'data', 'record1') - + u = Unisens(folder) u.save() self.assertTrue(os.path.isdir(folder)) self.assertTrue(os.path.isfile(os.path.join(folder, 'unisens.xml'))) - + u.key1 = 'value1' u.set_attrib('key2', 'value2') u.set_attrib('key3', 3) - - for i in range(2): # run it once, then save and reload and run again + + for i in range(2): # run it once, then save and reload and run again self.assertTrue(hasattr(u, 'key1')) self.assertTrue(hasattr(u, 'key2')) self.assertTrue(hasattr(u, 'key3')) - + self.assertEqual(u.attrib['key1'], 'value1') self.assertEqual(u.attrib['key2'], 'value2') self.assertEqual(u.attrib['key3'], 3) - + self.assertEqual(u.key1, 'value1') self.assertEqual(u.key2, 'value2') self.assertEqual(u.key3, 3) - + u.save() - - + def test_unisens_overwrite_load(self): folder = os.path.join(self.tmpdir, 'data', 'record2') u = Unisens(folder) u.key1 = 'value1' u.save() u = Unisens(folder) - + self.assertTrue(hasattr(u, 'key1')) self.assertEqual(u.key1, 'value1') self.assertEqual(u.attrib['key1'], 'value1') - + u = Unisens(folder, makenew=True) self.assertFalse(hasattr(u, 'key1')) - self.assertNotIn('key1', u.attrib) - - + self.assertNotIn('key1', u.attrib) + def test_entry_creation(self): folder = os.path.join(self.tmpdir, 'data', 'record2') for entrytype in [CustomEntry, ValuesEntry, SignalEntry, EventEntry]: - - with self.assertRaises(ValueError): - entry = entrytype() - + with self.assertRaises(ValueError) as e: + entrytype() + assert 'id must be supplied' in str(e.exception) + entry = entrytype(parent=folder, id='test.csv') entry.set_attrib('key1', 'value1') entry.key2 = 'value2' @@ -99,15 +101,14 @@ def test_entry_creation(self): self.assertEqual(entry.attrib['id'], 'test.csv') self.assertEqual(entry.attrib['key1'], 'value1') self.assertEqual(entry.attrib['key2'], 'value2') - + entry.remove_attr('key1') self.assertFalse(hasattr(entry, 'key1')) self.assertNotIn('key1', entry.attrib) - + with self.assertRaises(TypeError): misc = MiscEntry() - misc = MiscEntry(name='test') self.assertEqual(misc._name, 'test') self.assertEqual(len(misc.attrib), 0) @@ -121,15 +122,15 @@ def test_entry_creation(self): # with self.assertRaises(TypeError): # custom = CustomAttributes() - - custom = CustomAttributes(key = 'key1', value = 'value1') + + custom = CustomAttributes(key='key1', value='value1') self.assertEqual(len(custom), 0) self.assertEqual(len(custom.attrib), 1) self.assertTrue(hasattr(custom, 'key1')) self.assertEqual(custom.key1, 'value1') custom.remove_attr('key1') self.assertEqual(len(custom.attrib), 0) - custom = CustomAttributes(key = 'key1', value = 'value1') + custom = CustomAttributes(key='key1', value='value1') custom.set_attrib('key2', 'value2') custom.set_attrib('key1', 'value2') self.assertTrue(hasattr(custom, 'key1')) @@ -138,33 +139,31 @@ def test_entry_creation(self): self.assertEqual(custom.key1, 'value2') self.assertEqual(custom.key2, 'value2') - def test_unisens_add_entr(self): folder = os.path.join(self.tmpdir, 'data', 'record') u = Unisens(folder, makenew=True) - - for i,entrytype in enumerate([CustomEntry, ValuesEntry, - SignalEntry, EventEntry]): - entry = entrytype(parent=folder, id='test'+str(i)+'.csv') + + for i, entrytype in enumerate([CustomEntry, ValuesEntry, + SignalEntry, EventEntry]): + entry = entrytype(parent=folder, id='test' + str(i) + '.csv') entry.set_attrib('key1', 'value1') entry.key2 = 'value2' u.add_entry(entry) if entrytype == CustomEntry: entry.set_data('test') else: - entry.set_data([[1,2,3],[1,2,3]]) + entry.set_data([[1, 2, 3], [1, 2, 3]], sampleRate=256) self.assertEqual(len(u), 4) self.assertEqual(len(u.entries), 4) u.save() - - + u = Unisens(folder) self.assertEqual(len(u), 4) - + u = Unisens(folder, makenew=True) self.assertEqual(len(u), 0) - + misc = MiscEntry(name='miscentry') misc.set_attrib('key2', 'value2') customattr = CustomAttributes('key1', 'value1') @@ -173,29 +172,27 @@ def test_unisens_add_entr(self): self.assertEqual(len(u), 2) def test_unisens_autosave(self): - + folder = os.path.join(self.tmpdir, 'data', 'record') u = Unisens(folder, makenew=True, autosave=True) - - for i,entrytype in enumerate([CustomEntry, ValuesEntry, - SignalEntry, EventEntry]): + + for i, entrytype in enumerate([CustomEntry, ValuesEntry, + SignalEntry, EventEntry]): entry = entrytype(parent=folder, id=f'test{i}.csv') - with open(os.path.join(entry._folder, f'test{i}.csv'), 'w'):pass + with open(os.path.join(entry._folder, f'test{i}.csv'), 'w'): pass entry.set_attrib('key1', 'value1') entry.key2 = 'value2' u.add_entry(entry) self.assertEqual(len(u), 4) self.assertEqual(len(u.entries), 4) - + u = Unisens(folder) self.assertEqual(len(u), 4) - - - + def test_load_examples(self): example1 = os.path.join(os.path.dirname(__file__), 'Example_001') - + u = Unisens(example1, readonly=True) self.assertEqual(len(u), 9) self.assertTrue(hasattr(u, 'entries')) @@ -206,20 +203,19 @@ def test_load_examples(self): self.assertEqual(u.attrib['timestampStart'], '2008-07-08T13:32:51') self.assertEqual(u.attrib['measurementId'], 'Example_001') - - for name in ['customAttributes', 'imp200.bin', 'ecg200.bin', - 'valid_signal.csv', 'trig_ref.csv', 'bp.csv', - 'trig_test_osea_dry.csv', 'picture.jpg', 'default_ecg']: + for name in ['customAttributes', 'imp200.bin', 'ecg200.bin', + 'valid_signal.csv', 'trig_ref.csv', 'bp.csv', + 'trig_test_osea_dry.csv', 'picture.jpg', 'default_ecg']: self.assertIn(name, u.entries) - + entry = u[0] self.assertEqual(entry.height, '1.74m') - + entry = u.entries['customAttributes'] self.assertEqual(entry.weight, '73kg') self.assertEqual(entry.height, '1.74m') self.assertEqual(entry.gender, 'male') - + entry = u.entries['imp200.bin'] self.assertEqual(entry.adcResolution, '16') self.assertEqual(entry.comment, 'Electrode Impedance') @@ -230,11 +226,10 @@ def test_load_examples(self): self.assertEqual(entry.sampleRate, '200') self.assertEqual(entry.unit, 'Ohm') self.assertEqual(entry.binFileFormat.endianess, 'LITTLE') - self.assertEqual(entry.channel[0].name, 'imp_left') - self.assertEqual(entry.channel[1].name, 'imp_right') + self.assertEqual(entry.channel[0].name, 'imp_left') + self.assertEqual(entry.channel[1].name, 'imp_right') self.assertEqual(len(entry.channel), 2) - entry = u.entries['ecg200.bin'] self.assertEqual(entry.adcResolution, '16') self.assertEqual(entry.comment, 'Filtered ECG Data 200Hz, Bandpass 3rd order Butterworth 0.5-35') @@ -245,17 +240,17 @@ def test_load_examples(self): self.assertEqual(entry.sampleRate, '200') self.assertEqual(entry.unit, 'V') self.assertEqual(entry.binFileFormat.endianess, 'LITTLE') - self.assertEqual(entry.channel[0].name, 'dry electrodes') - self.assertEqual(entry.channel[1].name, 'ref1') + self.assertEqual(entry.channel[0].name, 'dry electrodes') + self.assertEqual(entry.channel[1].name, 'ref1') self.assertEqual(len(entry.channel), 3) - + entry = u.entries['valid_signal.csv'] self.assertEqual(entry.id, 'valid_signal.csv') self.assertEqual(entry.comment, 'Valid signal regions for evaluation') self.assertEqual(entry.sampleRate, '1') self.assertEqual(entry.typeLength, '1') - self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') - self.assertEqual(entry.csvFileFormat.separator, ';') + self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') + self.assertEqual(entry.csvFileFormat.separator, ';') self.assertEqual(len(entry), 1) entry = u.entries['trig_ref.csv'] @@ -265,19 +260,20 @@ def test_load_examples(self): self.assertEqual(entry.typeLength, '1') self.assertEqual(entry.source, 'Padsy') self.assertEqual(entry.contentClass, 'TRIGGER') - self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') - self.assertEqual(entry.csvFileFormat.separator, ';') + self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') + self.assertEqual(entry.csvFileFormat.separator, ';') self.assertEqual(len(entry), 1) entry = u.entries['trig_test_osea_dry.csv'] self.assertEqual(entry.id, 'trig_test_osea_dry.csv') - self.assertEqual(entry.comment, 'test qrs trigger list, 200Hz, done with osea on dry electrodes (ecg200.bin:dry electrodes)') + self.assertEqual(entry.comment, + 'test qrs trigger list, 200Hz, done with osea on dry electrodes (ecg200.bin:dry electrodes)') self.assertEqual(entry.sampleRate, '200') self.assertEqual(entry.typeLength, '1') self.assertEqual(entry.source, 'osea') self.assertEqual(entry.contentClass, 'TRIGGER') - self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') - self.assertEqual(entry.csvFileFormat.separator, ';') + self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') + self.assertEqual(entry.csvFileFormat.separator, ';') self.assertEqual(len(entry), 1) entry = u.entries['bp.csv'] @@ -287,10 +283,10 @@ def test_load_examples(self): self.assertEqual(entry.dataType, 'double') self.assertEqual(entry.unit, 'mmHg') self.assertEqual(entry.contentClass, 'RR') - self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') - self.assertEqual(entry.csvFileFormat.separator, ';') - self.assertEqual(entry.channel[0].name, 'systolisch') - self.assertEqual(entry.channel[1].name, 'diastolisch') + self.assertEqual(entry.csvFileFormat.decimalSeparator, '.') + self.assertEqual(entry.csvFileFormat.separator, ';') + self.assertEqual(entry.channel[0].name, 'systolisch') + self.assertEqual(entry.channel[1].name, 'diastolisch') self.assertEqual(len(entry), 3) self.assertEqual(len(entry.channel), 2) @@ -307,9 +303,8 @@ def test_load_examples(self): self.assertEqual(len(entry.groupEntry), 2) self.assertEqual(len(entry), 2) - - # check if loading and saving will reproduce the same tree def test_load_and_save(self): + # check if loading and saving will reproduce the same tree example1 = os.path.join(os.path.dirname(__file__), 'Example_001') example2 = os.path.join(os.path.dirname(__file__), 'Example_002') example3 = os.path.join(os.path.dirname(__file__), 'Example_003') @@ -317,105 +312,102 @@ def test_load_and_save(self): u = Unisens(example, readonly=True) entry = MiscEntry('group') u1 = u.copy() - u1._readonly=False + u1._readonly = False u1.add_entry(entry) u1.save(filename='test.xml') u2 = Unisens(folder=u1._folder, filename='test.xml') self.assertTrue(elements_equal(u1.to_element(), u2.to_element())) u2.add_entry(MiscEntry('binFileFormat')) u2.save(filename='test.xml') - u2 = Unisens(folder=u1._folder,filename='test.xml') + u2 = Unisens(folder=u1._folder, filename='test.xml') with self.assertRaises(AssertionError): elements_equal(u1.to_element(), u2.to_element()) - - def test_load_data(self): example1 = os.path.join(os.path.dirname(__file__), 'Example_001') u = Unisens(example1, readonly=True) signal = u['imp200.bin'] - data = signal.get_data() - self.assertEqual(len(data),2) + data = signal.get_data() + self.assertEqual(len(data), 2) self.assertEqual(data.size, 1817200) self.assertEqual(data.min(), 1) self.assertEqual(data.max(), 32767) - data = signal.get_data(scaled=False) - self.assertEqual(len(data),2) + assert 'baseline' not in signal.attrib + signal.set_attrib('baseline', 5) + data = signal.get_data() + self.assertEqual(len(data), 2) self.assertEqual(data.size, 1817200) - self.assertEqual(data.min(), 1) - self.assertEqual(data.max(), 32767) - + self.assertEqual(data.min(), -4) + self.assertEqual(data.max(), 32762) + signal = u['ecg200.bin'] - data = signal.get_data() + data = signal.get_data() self.assertEqual(len(data), 3) self.assertEqual(data.size, 2725800) self.assertEqual(data.min(), -0.083329024) self.assertEqual(data.max(), 0.08332648100000001) - data = signal.get_data(scaled=False) + assert signal.lsbValue == '2.543E-6' + signal.set_attrib('lsbValue', '1') + data = signal.get_data() self.assertEqual(data.size, 2725800) self.assertEqual(data.min(), -32768) self.assertEqual(data.max(), 32767) - + events = u['valid_signal.csv'] - data = events.get_data(mode='list') + data = events.get_data(mode='list') self.assertEqual(len(data), 2) self.assertEqual(data[0], [10, '(']) self.assertEqual(data[1], [4521, ')']) - - data = events.get_data(mode='numpy') + + data = events.get_data(mode='numpy') self.assertEqual(len(data), 2) self.assertEqual(data[0][0], '10') self.assertEqual(data[1][0], '4521') - - data = events.get_data(mode='pd') + + data = events.get_data(mode='pd') self.assertEqual(len(data), 2) self.assertEqual(data[0][0], 10) self.assertEqual(data[0][1], 4521) - + custom = u['picture.jpg'] - data = custom.get_data(dtype='binary') + data = custom.get_data(dtype='binary') self.assertEqual(len(data), 724116) - data = custom.get_data(dtype='image') + data = custom.get_data(dtype='image') data = np.asarray(data) self.assertEqual(data.shape, (1463, 1388, 3)) - - self.assertEqual('Unisens: Example_001(0:00:00, 9 entries)', str(u)) - - + self.assertEqual('Unisens: Example_001(0:00:00, 9 entries)', str(u)) def test_save_signalentry(self): folder = os.path.join(self.tmpdir, 'data', 'record5') for dtype in ['int16', 'uint8', 'float', 'int32']: u = Unisens(folder, makenew=True) - data1 = (np.random.rand(5,500)*100).astype(dtype) + data1 = (np.random.rand(5, 500) * 100).astype(dtype) signal = SignalEntry(id='signal.bin', parent=u) signal.set_attrib('test', 'asd') - signal.set_data(data1, sampleRate=500, lsbValue=1, unit='mV', + signal.set_data(data1, sampleRate=500, lsbValue=1, unit='mV', comment='Test', contentClass='EEG') u.save() u1 = Unisens(folder) - data2 = u1['signal.bin'].get_data() + data2 = u1['signal.bin'].get_data(True) np.testing.assert_almost_equal(data1, data2) - + folder = os.path.join(self.tmpdir, 'data', 'record') u = Unisens(folder, makenew=True) - - + def test_save_customtypes(self): folder = os.path.join(self.tmpdir, 'data', 'customtypes') from collections import OrderedDict u = Unisens(folder, makenew=True) image = np.random.randint(0, 128, (512, 512, 3), dtype=np.uint8) - data = OrderedDict({'test':'test', 'asdv': '2345', 'adfg':['3','34','234']}) + data = OrderedDict({'test': 'test', 'asdv': '2345', 'adfg': ['3', '34', '234']}) text = 'asd,123;456\qwe,678;678' - + image_exts = ['.jpeg', '.jpg', '.bmp', '.png', '.tif', '.gif'] for ext in image_exts: custom = CustomEntry(f'image{ext}', parent=u) custom.set_data(image) - CustomEntry('data.npy', parent=u).set_data(image) CustomEntry('pickle.pkl', parent=u).set_data(data) @@ -423,7 +415,7 @@ def test_save_customtypes(self): CustomEntry('text.txt', parent=u).set_data(text) CustomEntry('text.csv', parent=u).set_data(text) u.save() - + u = Unisens(folder) np.testing.assert_array_equal(u['image.png'].get_data(), image) np.testing.assert_array_equal(u['image.bmp'].get_data(), image) @@ -434,16 +426,13 @@ def test_save_customtypes(self): np.testing.assert_array_equal(u['data.npy'].get_data(), image) self.assertDictEqual(u['pickle.pkl'].get_data(), data) - def test_save_csvetry(self): - self.tmpdir = tempfile.mkdtemp(prefix='unisens') - folder = os.path.join(self.tmpdir, 'data', 'record1') - + u = Unisens(folder, makenew=True) - times = [[i*100 + float(np.random.rand(1)), f'trigger {i}'] for i in range(15)] + times = [[i * 100 + float(np.random.rand()), f'trigger {i}'] for i in range(15)] event = EventEntry(id='triggers.csv', parent=u, separator=',', decimalSeparator='.') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -452,13 +441,12 @@ def test_save_csvetry(self): times2 = event2.get_data() self.assertSequenceEqual(times, times2) - - # now test with different separators + # now test with different separators folder = os.path.join(self.tmpdir, 'data', 'record2') u = Unisens(folder, makenew=True) - times = [[i*100 + float(np.random.rand(1)), f'trigger {i}'] for i in range(15)] + times = [[i * 100 + float(np.random.rand()), f'trigger {i}'] for i in range(15)] event = EventEntry(id='triggers.csv', parent=u, separator=';', decimalSeparator=',') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -466,13 +454,13 @@ def test_save_csvetry(self): event2 = u2['triggers.csv'] times2 = event2.get_data() self.assertSequenceEqual(times, times2) - + # now test with np array folder = os.path.join(self.tmpdir, 'data', 'record3') u = Unisens(folder, makenew=True) times = np.random.rand(5, 15) event = EventEntry(id='triggers.csv', parent=u, separator=',', decimalSeparator='.') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -482,14 +470,12 @@ def test_save_csvetry(self): np.testing.assert_allclose(times, times2) def test_save_valuesentry(self): - self.tmpdir = tempfile.mkdtemp(prefix='unisens') - folder = os.path.join(self.tmpdir, 'data', 'record1') - + u = Unisens(folder, makenew=True) - times = [[i*100 + float(np.random.rand(1)), f'trigger {i}'] for i in range(15)] + times = [[i * 100 + float(np.random.rand()), f'trigger {i}'] for i in range(15)] event = ValuesEntry(id='triggers.csv', parent=u, separator=',', decimalSeparator='.') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -497,13 +483,13 @@ def test_save_valuesentry(self): event2 = u2['triggers.csv'] times2 = event2.get_data() self.assertSequenceEqual(times, times2) - + # now test with different separators folder = os.path.join(self.tmpdir, 'data', 'record2') u = Unisens(folder, makenew=True) - times = [[i*100 + float(np.random.rand(1)), f'trigger {i}'] for i in range(15)] + times = [[i * 100 + float(np.random.rand()), f'trigger {i}'] for i in range(15)] event = ValuesEntry(id='triggers.csv', parent=u, separator=';', decimalSeparator=',') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -511,13 +497,13 @@ def test_save_valuesentry(self): event2 = u2['triggers.csv'] times2 = event2.get_data() self.assertSequenceEqual(times, times2) - + # now test with np array folder = os.path.join(self.tmpdir, 'data', 'record3') u = Unisens(folder, makenew=True) times = np.random.rand(5, 15) event = ValuesEntry(id='triggers.csv', parent=u, separator=',', decimalSeparator='.') - event.set_attrib('contentClass','trigger') + event.set_attrib('contentClass', 'trigger') event.set_attrib('comment', 'marks the trigger pointy thingy dingies') event.set_data(times, contentClass='triggers', unit='ms') u.save() @@ -525,133 +511,132 @@ def test_save_valuesentry(self): event2 = u2['triggers.csv'] times2 = event2.get_data() np.testing.assert_allclose(times, times2) - + def test_access_no_ext_item(self): - u = Unisens(tempfile.mkdtemp(prefix='unisens')) + u = Unisens(self.tmpdir, makenew=True) entry1 = SignalEntry('single.csv', parent=u) entry2 = SignalEntry('double.csv', parent=u) entry3 = SignalEntry('double.bin', parent=u) - - self.assertIs (u['single.csv'], entry1) - self.assertIs (u['single'], entry1) - self.assertIs (u['double.csv'], entry2) - self.assertIs (u['double.bin'], entry3) + + self.assertIs(u['single.csv'], entry1) + self.assertIs(u['single'], entry1) + self.assertIs(u['double.csv'], entry2) + self.assertIs(u['double.bin'], entry3) with self.assertRaises(IndexError): u['double'] - + self.assertIn('single.csv', u) self.assertIn('single', u) - self.assertIn('double.csv', u) - self.assertNotIn('double', u) - - + self.assertIn('double.csv', u) + self.assertNotIn('double', u) + def test_access_no_ext_attrib(self): - u = Unisens(tempfile.mkdtemp(prefix='unisens')) + u = Unisens(self.tmpdir, makenew=True) entry1 = SignalEntry('single.csv', parent=u) entry2 = SignalEntry('double.csv', parent=u) entry3 = SignalEntry('double.bin', parent=u) - - self.assertIs (u.single, entry1) - self.assertIs (u.single_csv, entry1) - self.assertIs (u.double_csv, entry2) - self.assertIs (u.double_bin, entry3) + + self.assertIs(u.single, entry1) + self.assertIs(u.single_csv, entry1) + self.assertIs(u.double_csv, entry2) + self.assertIs(u.double_bin, entry3) with self.assertRaises(IndexError): u.double - - - + def test_subentries(self): """this is not officially supported, but useful""" - folder = tempfile.mkdtemp(prefix='unisens_x') + folder = os.path.join(self.tmpdir, 'subentries') u = Unisens(folder, makenew=True, autosave=True) c = CustomEntry(id='test.bin', parent=u).set_data(b'123') CustomEntry('feat1.txt', parent=c).set_data('123') CustomEntry('feat2.txt', parent=c).set_data('456') self.assertEqual(u['test']['feat1'].get_data(), '123') self.assertEqual(u['test']['feat2'].get_data(), '456') - + u = Unisens(folder) self.assertEqual(u['test']['feat1'].get_data(), '123') self.assertEqual(u['test']['feat2'].get_data(), '456') - + def test_entries_with_subfolder(self): """this is not officially supported, but useful""" - folder = tempfile.mkdtemp(prefix='unisens_x') + folder = os.path.join(self.tmpdir, 'subfolder') u = Unisens(folder, makenew=True, autosave=True) c = CustomEntry(id='test.bin', parent=u) - c1=CustomEntry('sub/feat1.txt', parent=c).set_data('123') - c2=CustomEntry('sub\\feat2.txt', parent=c).set_data('456') + c1 = CustomEntry('sub1/feat1.txt', parent=c).set_data('123') + c2 = CustomEntry('sub2\\feat2.txt', parent=c).set_data('456') with self.assertRaises((ValueError, PermissionError, OSError)): - CustomEntry('\\sub\\feat3.txt', parent=c).set_data('789') - + CustomEntry('\\sub\\feat3.txt', parent=c) + self.assertTrue(os.path.isfile(c1._filename), f'{c1._filename} not found') self.assertTrue(os.path.isfile(c2._filename), f'{c2._filename} not found') - with open(os.path.join(folder, 'test.bin'), 'w'):pass + with open(os.path.join(folder, 'test.bin'), 'w'): pass u = Unisens(folder) self.assertEqual(u['test']['feat1'].get_data(), '123') - self.assertEqual(u['test']['sub/feat2.txt'].get_data(), '456') - - + self.assertEqual(u['test']['sub2/feat2.txt'].get_data(), '456') + def test_copy(self): - folder = tempfile.mkdtemp(prefix='unisens_copy') + folder = os.path.join(self.tmpdir, 'copy') u1 = Unisens(folder, makenew=True, autosave=True) CustomEntry('test1.txt', parent=u1).set_data('asd') u2 = u1.copy() CustomEntry('test2.txt', parent=u1).set_data('qwerty') - + u1.asd = 2 self.assertTrue(hasattr(u1, 'asd')) self.assertFalse(hasattr(u2, 'asd')) - + self.assertEqual(u1['test1'].get_data(), 'asd') self.assertEqual(u2['test1'].get_data(), 'asd') self.assertEqual(u1['test2'].get_data(), 'qwerty') - - with self.assertRaises(KeyError): + + with self.assertRaises(KeyError) as e: u2['test2'] - - + assert 'test2' in str(e.exception) + def test_nostacking(self): """this is not officially supported, but useful""" - folder = tempfile.mkdtemp(prefix='unisens_') - + folder = os.path.join(self.tmpdir, 'stack') + c = CustomEntry(id='test.bin', parent=folder) f = FileEntry('feat1.txt', parent=folder) - c.add_entry(f.copy()) + d = f.copy() + assert d._parent is None + c.add_entry(d) + assert d._parent == c c.add_entry(f.copy()) self.assertEqual(len(c), 2) - + c = CustomEntry(id='test.bin', parent=folder) f = FileEntry('feat1.txt', parent=folder) c.add_entry(f.copy(), stack=False) c.add_entry(f.copy(), stack=False) self.assertEqual(len(c), 1) - + c = CustomEntry(id='test.bin', parent=folder) f = MiscEntry(name='Test', parent=folder) - c.add_entry(f.copy().set_attrib('test1','val1')) - c.add_entry(f.copy().set_attrib('test2','val2')) + c.add_entry(f.copy().set_attrib('test1', 'val1')) + c.add_entry(f.copy().set_attrib('test2', 'val2')) self.assertEqual(len(c), 2) self.assertEqual(c['test'][0].test1, 'val1') self.assertEqual(c['test'][1].test2, 'val2') - + c = CustomEntry(id='test.bin', parent=folder) f = MiscEntry(name='Test', parent=folder) - c.add_entry(f.copy().set_attrib('test1','val1'), stack=False) - c.add_entry(f.copy().set_attrib('test2','val2'), stack=False) + c.add_entry(f.copy().set_attrib('test1', 'val1'), stack=False) + c.add_entry(f.copy().set_attrib('test2', 'val2'), stack=False) self.assertEqual(len(c), 1) self.assertEqual(c['test'].test2, 'val2') - - + assert not hasattr(c.test, 'test1') + def test_indexfinding(self): """try whether the index finding method is working correctly""" - folder = tempfile.mkdtemp(prefix='unisens_') - + folder = os.path.join(self.tmpdir, 'index') + c = CustomEntry(id='test.bin', parent=folder) FileEntry('feat1.txt', parent=c) FileEntry('FEAT1.bin', parent=c) FileEntry('feat2.txt', parent=c) - + self.assertEqual(c._get_index('feat2'), (2, 'feat2_txt')) self.assertEqual(c._get_index('feat2.txt'), (2, 'feat2_txt')) self.assertEqual(c._get_index('feat2_txt'), (2, 'feat2_txt')) @@ -659,8 +644,7 @@ def test_indexfinding(self): self.assertEqual(c._get_index('feat1_txt'), (0, 'feat1_txt')) self.assertEqual(c._get_index('feat1.bin'), (1, 'FEAT1_bin')) self.assertEqual(c._get_index('feat1_bin'), (1, 'FEAT1_bin')) - - + self.assertEqual(c._get_index('fEAT2'), (2, 'feat2_txt')) self.assertEqual(c._get_index('fEAT2.txt'), (2, 'feat2_txt')) self.assertEqual(c._get_index('fEAT2_txt'), (2, 'feat2_txt')) @@ -668,52 +652,48 @@ def test_indexfinding(self): self.assertEqual(c._get_index('fEAT1_txt'), (0, 'feat1_txt')) self.assertEqual(c._get_index('fEAT1.bin'), (1, 'FEAT1_bin')) self.assertEqual(c._get_index('fEAT1_bin'), (1, 'FEAT1_bin')) - + with self.assertRaises(IndexError): c._get_index('feat1') - + with self.assertRaises(IndexError): - c._get_index('FEAT1') - + c._get_index('FEAT1') + with self.assertRaises(KeyError): c._get_index('feat0') - def test_indexfinding_subfolders(self): """try whether the index finding method is working correctly""" - folder = tempfile.mkdtemp(prefix='unisens_') - + folder = os.path.join(self.tmpdir, 'index_subfolders') + c = CustomEntry(id='test.bin', parent=folder) FileEntry('feat.txt', parent=c) FileEntry('feat/feat.txt', parent=c) FileEntry('feat/feat.bin', parent=c) FileEntry('feat/one.txt', parent=c) FileEntry('feat/one_two.txt', parent=c) - + self.assertEqual(c._get_index('feat.txt'), (0, 'feat_txt')) - + self.assertEqual(c._get_index('one'), (3, 'feat_one_txt')) self.assertEqual(c._get_index('one.txt'), (3, 'feat_one_txt')) self.assertEqual(c._get_index('one_two'), (4, 'feat_one_two_txt')) self.assertEqual(c._get_index('one_two.txt'), (4, 'feat_one_two_txt')) - + with self.assertRaises(IndexError): self.assertEqual(c._get_index('feat'), (0, 'feat_txt')) - - def test_nooverwrite(self): - folder = tempfile.mkdtemp(prefix='unisens_copy') + folder = os.path.join(self.tmpdir, 'no_overwrite') u = Unisens(folder, makenew=True, autosave=True) c = CustomEntry('test.txt', parent=u).set_attrib('a1', 'b1') self.assertEqual(u.test.a1, 'b1') with self.assertRaises(KeyError): c = CustomEntry('test.txt', parent=u).set_attrib('a2', 'b2') CustomEntry('asd.bin', parent=c) - - + def test_loaddifferentfile(self): - folder = tempfile.mkdtemp(prefix='unisens_newfile') + folder = os.path.join(self.tmpdir, 'newfile') u = Unisens(folder, makenew=True, autosave=False) c = CustomEntry('test.txt', parent=u).set_attrib('a1', 'b1').set_data('test') self.assertEqual(str(c), '') @@ -722,53 +702,51 @@ def test_loaddifferentfile(self): self.assertTrue(os.path.isfile(os.path.join(u._folder, 'test.xml'))) u1 = Unisens(folder, autosave=False) u2 = Unisens(folder, filename='test.xml', autosave=False) - + self.assertNotIn('test', u1) self.assertTrue(elements_equal(u.to_element(), u2.to_element())) - + def test_loaddifferentfile2(self): - folder = tempfile.mkdtemp(prefix='unisens') + folder = os.path.join(self.tmpdir, 'diff_file') u = Unisens(folder, makenew=True, autosave=True) CustomEntry('test.bin', parent=u).set_data(b'test') u.remove_entry('test.bin') - + CustomEntry('test.bin', parent=u).set_data(b'test') u.remove_entry('test') - + CustomEntry('test.bin', parent=u).set_data(b'test') - u.remove_entry('test_bin') - + u.remove_entry('test_bin') + def test_repr_str(self): - folder = tempfile.mkdtemp(prefix='strrepr') + folder = os.path.join(self.tmpdir, 'strrepr') u = Unisens(folder, makenew=True, autosave=True) u.measurementId = 'thisid' - u.duration = 60*2 + 60*60*2 + 5 + u.duration = 60 * 2 + 60 * 60 * 2 + 5 a = str(u) b = repr(u) - self.assertEqual(a,'Unisens: thisid(2:02:05, 0 entries)') - self.assertEqual(b,f'Unisens(comment=, duration=2:02:05, id=thisid,timestampStart={u.timestampStart})') - - def test_serialize(self) : - folder = tempfile.mkdtemp(prefix='seria') + self.assertEqual(a, 'Unisens: thisid(2:02:05, 0 entries)') + self.assertEqual(b, f'Unisens(comment=, duration=2:02:05, id=thisid,timestampStart={u.timestampStart})') + + def test_serialize(self): + folder = os.path.join(self.tmpdir, 'seria') u = Unisens(folder, makenew=True, autosave=True) CustomEntry('test.bin', parent=u).set_data(b'test') CustomEntry('test2.bin', parent=u).set_data(b'test2') u.save() - u.asd='asdf' - - with open(folder + '/asd.pkl', 'wb') as f: + u.asd = 'asdf' + + with open(folder + '/asd.pkl', 'wb') as f: pickle.dump(u, f) - - with open(folder + '/asd.pkl', 'rb') as f: - u1 = pickle.load(f) - + + with open(folder + '/asd.pkl', 'rb') as f: + u1 = pickle.load(f) + elements_equal(u.to_element(), u1.to_element()) - - - + def test_convert_nums(self): - folder = tempfile.mkdtemp(prefix='convert_nums') - u = Unisens(folder, makenew=True, autosave=True) + folder = os.path.join(self.tmpdir, 'convert_nums') + u = Unisens(folder, makenew=True, autosave=True) u.int = 1 u.float = 1.5 u.bool = True @@ -776,25 +754,25 @@ def test_convert_nums(self): u.entry.int = 1 u.entry.float = 1.5 u.entry.bool = True - + self.assertEqual(u.int, 1) self.assertEqual(u.float, 1.5) self.assertEqual(u.bool, True) self.assertEqual(u.entry.int, 1) self.assertEqual(u.entry.float, 1.5) self.assertEqual(u.entry.bool, True) - - u1 = Unisens(folder, readonly=True, convert_nums=False) - + + u1 = Unisens(folder, readonly=True, convert_nums=False) + self.assertEqual(u1.int, '1') self.assertEqual(u1.float, '1.5') self.assertEqual(u1.bool, 'True') self.assertEqual(u1.entry.int, '1') self.assertEqual(u1.entry.float, '1.5') - self.assertEqual(u1.entry.bool,'True') - - u1 = Unisens(folder, readonly=True, convert_nums=True) - + self.assertEqual(u1.entry.bool, 'True') + + u1 = Unisens(folder, readonly=True, convert_nums=True) + self.assertEqual(u1.int, 1) self.assertEqual(u1.float, 1.5) self.assertEqual(u1.bool, True) @@ -802,20 +780,168 @@ def test_convert_nums(self): self.assertEqual(u1.entry.float, 1.5) self.assertEqual(u1.entry.bool, True) + def test_remove_entry(self): + u = Unisens(self.tmpdir, makenew=True) + CustomEntry(id='custom.txt', parent=u) + SignalEntry(id='signal.bin', parent=u) + ValuesEntry(id='values.csv', parent=u) + for entry_name in ['custom.txt', 'signal.bin', 'values.csv']: + #assert hasattr(u, entry_name) + assert make_key(entry_name) in u.__dict__ + e = u.entries[entry_name] + assert e in u._entries + del entry_name, e + for entry_name in ['signal.bin', 'values.csv', 'custom.txt']: + e = u.entries[entry_name] + u.remove_entry(entry_name) + assert make_key(entry_name) not in u.__dict__ + assert entry_name not in u.entries + assert e not in u._entries + + def test_write_signal_entry(self): + unisens = Unisens(os.path.join(os.path.dirname(__file__), 'Example_003')) + + signal = unisens.acc_textile_50_bin + original_data = signal.get_data(scaled=True) + data_scaled = original_data[:, :48] + data_unscaled = signal.get_data(scaled=False)[:, :48] + assert np.any(data_unscaled != data_scaled) + + from copy import copy + kwargs: dict = copy(signal.attrib) + original_sample_rate = kwargs.pop('sampleRate', None) + kwargs['sampleRate'] = float(1 / 3600) + kwargs.pop('id', None) + ch_names = ['tick', 'trick', 'track'] + + for ending in ['.csv', '.bin']: + name = 'test_signal' + ending + for f in ['a_', 'b_', 'c_']: + file = f + name + if hasattr(unisens, file): + unisens.remove_entry(file) + + kwargs.update({'dataType': 'int16', 'lsbValue': '0.00294', 'baseline': '2048'}) + s = SignalEntry(id='a_' + name, parent=unisens) + s.set_data(data_unscaled, ch_names=ch_names, **kwargs) + data_return = s.get_data(scaled=False) + assert np.all(data_unscaled == data_return) + data_return1 = s.get_data(scaled=True) + if ending == '.bin': + assert np.all(data_scaled == data_return1) + assert np.all(((data_unscaled - 2048) * 0.00294) == data_return1) + else: + assert np.all(data_return == data_return1) + + kwargs.update({'dataType': 'float64', 'lsbValue': 1, 'baseline': '0'}) + s2 = SignalEntry(id='b_' + name, parent=unisens, **kwargs) + s2.set_data(data_unscaled, ch_names=ch_names) + data_return2 = s2.get_data(scaled=False) + assert np.all(data_unscaled == data_return2) + data_return21 = s2.get_data(scaled=True) + assert np.all(data_unscaled == data_return21) + + kwargs.update({'dataType': 'uint16', 'lsbValue': '0.00098', 'baseline': '6426'}) + s3 = SignalEntry(id='c_' + name, parent=unisens, attrib=kwargs) + s3.set_data(abs(data_unscaled), ch_names=ch_names) + data_return3 = s3.get_data(scaled=False) + assert np.all(abs(data_unscaled) == data_return3) + data_return31 = s3.get_data(scaled=True) + if ending == '.bin': + assert np.any(data_scaled != data_return31) # different scaling + assert np.all(((abs(data_unscaled) - 6426) * 0.00098) == data_return31) + else: + assert np.all(abs(data_unscaled) == data_return31) + + def test_deprecation(self): + # CustomAttribute + from unisens import CustomAttribute + ca = CustomAttributes() + # expected usage + with pytest.warns(DeprecationWarning) as dep: + att = CustomAttribute(height2='2,05m') + assert att.height2 == '2,05m' + with pytest.raises(AttributeError): + ca.add_entry(att) + assert not hasattr(ca, 'height2') + + with pytest.warns(DeprecationWarning) as dep: + att1 = CustomAttribute() + att1.set_attrib('key', 'height2') + att1.set_attrib('value', '2,05m') + ca.add_entry(att1) + assert ca.height2 == '2,05m' + # desired usage + ca.set_attrib(name='height1', value='1,73m') + assert ca.height1 == '1,73m' + + # return_type + u = Unisens(os.path.join(os.path.dirname(__file__), 'Example_001')) + with warnings.catch_warnings(): + warnings.simplefilter("error") + signal1 = u.ecg200_bin.get_data(scaled=True) + #with pytest.warns(DeprecationWarning) as dep: + with pytest.raises(DeprecationWarning) as d: + u.ecg200_bin.get_data(scaled=True, return_type='numpy') + assert str(d.value) == 'The argument `return_type` has no effect and will be removed with the next release.' + signal2 = u.ecg200_bin.get_data(scaled=True, return_type='numpy') + assert np.all(signal1 == signal2) + + # removing default channel naming for aligning data orientation verified by channels + with warnings.catch_warnings(): + warnings.simplefilter("error") + se1 = SignalEntry('test_signal.bin') + #ve1 = ValuesEntry() + with pytest.raises(DeprecationWarning) as d: + se1.set_data(data=signal2[:20], sampleRate=float(u.ecg200_bin.sampleRate)) + assert str(d.value) == 'Channel naming will be mandatory with the next release. ' \ + 'Please provide a list of channel names with set_data().' + se2 = SignalEntry('test_signal.bin', channel=u.ecg200_bin.channel) + se2.set_data(data=signal2[:20], sampleRate=float(u.ecg200_bin.sampleRate)) + ve1 = ValuesEntry('test_signal.csv') + with pytest.raises(DeprecationWarning) as d: + ve1.set_data(data=signal2[:20], sampleRate=float(u.ecg200_bin.sampleRate)) + assert str(d.value) == 'Channel naming will be mandatory with the next release. ' \ + 'Please provide a list of channel names with set_data().' + os.remove('test_signal.bin') + os.remove('test_signal.csv') + + def test_read_unisens_deprecation(self): + u_folder = os.path.join(os.path.dirname(__file__), 'Example_001') + + # unisens cannot be read twice -> use makenew or a different filepath + + # behaviour to remove + u0 = Unisens(os.path.join(os.path.dirname(__file__), 'Example_002')) # read first file here + assert not hasattr(u0, 'customAttributes') # data from first file + assert u0.timestampStart == '2008-07-04T13:27:57' + with pytest.deprecated_call() as dep: + u0.read_unisens(folder=u_folder) # read second file + assert str(dep.list[-1].message) == '`read_unisens` is deprecated and will be removed with the next release. Please read your unisens file by calling Unisens(folder=folder, filename=filename).' + assert u0.customAttributes.height == '1.74m' # data from second file + assert u0.timestampStart == '2008-07-08T13:32:51' + assert os.path.basename(u0._folder) == 'Example_002' # path from first file! + + u1 = Unisens(u_folder, filename='new_unisens.xml') # a new file here + with pytest.deprecated_call() as dep: + u1.read_unisens(folder=u_folder) # read different old file + assert str(dep.list[-1].message) == '`read_unisens` is deprecated and will be removed with the next release. Please read your unisens file by calling Unisens(folder=folder, filename=filename).' + assert os.path.basename(u1._file) == 'new_unisens.xml' + assert u1.customAttributes.height == '1.74m' # data from old file + + u2 = Unisens(u_folder, makenew=True) # set makenew + with pytest.warns(DeprecationWarning) as dep: + u2.read_unisens(folder=u_folder) # read actually existing file + assert str(dep.list[-1].message) == '`read_unisens` is deprecated and will be removed with the next release. Please read your unisens file by calling Unisens(folder=folder, filename=filename).' + assert u2._file == os.path.join(u_folder, 'unisens.xml') + assert u2.customAttributes.height == '1.74m' # data from existing file + + # no deprecation at initialization + with warnings.catch_warnings(): + warnings.simplefilter("error") + u3 = Unisens(u_folder, makenew=False) + assert u3.customAttributes.height == '1.74m' + if __name__ == '__main__': unittest.main() - - - - - - - - - - - - - - diff --git a/test/utils_test.py b/test/utils_test.py index 116d5c8..b86334a 100644 --- a/test/utils_test.py +++ b/test/utils_test.py @@ -8,118 +8,116 @@ from unisens import utils import unittest import shutil -import tempfile import numpy as np - class Testing(unittest.TestCase): - - def setUp(self): - self.tmpdir = tempfile.mkdtemp(prefix='unisens') - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - - def test_read_write_csv(self): - file = os.path.join(self.tmpdir, 'file.csv') + tmpdir = os.path.join(os.path.dirname(__file__), 'tmp') + + @classmethod + def setUp(cls): + os.makedirs(cls.tmpdir, exist_ok=True) + @classmethod + def tearDown(cls): + shutil.rmtree(cls.tmpdir) + + def test_read_write_csv(self): + file = os.path.join(self.tmpdir, 'file.csv') with self.assertRaises(AssertionError): - utils.write_csv(file, [3,4,4], sep=',', decimal_sep=',') - + utils.write_csv(file, [3, 4, 4], sep=',', decimal_sep=',') + with self.assertRaises(AssertionError): utils.write_csv(file, 'test', sep=',', decimal_sep=',') - + one_column = [[x] for x in range(5)] utils.write_csv(file, one_column) read = utils.read_csv(file, convert_nums=True) self.assertSequenceEqual(one_column, read) - + one_column = [x for x in range(5)] utils.write_csv(file, one_column, comment='test\ntest') read = utils.read_csv(file, convert_nums=True) - self.assertSequenceEqual(one_column, [x[0]for x in read]) - - two_column = [[x, str(x+10)+'xf'] for x in range(5)] + self.assertSequenceEqual(one_column, [x[0] for x in read]) + + two_column = [[x, str(x + 10) + 'xf'] for x in range(5)] utils.write_csv(file, two_column) read = utils.read_csv(file, convert_nums=True) self.assertSequenceEqual(two_column, read) - - two_column = [[x, str(x+10)+'xf'] for x in range(5)] + + two_column = [[x, str(x + 10) + 'xf'] for x in range(5)] utils.write_csv(file, two_column, sep=',') read = utils.read_csv(file, convert_nums=True) self.assertNotEqual(two_column, read) read = utils.read_csv(file, sep=',', convert_nums=True) - self.assertEqual(two_column, read) - - var_column = [[x, str(x+10)+'xf'] for x in range(5)] - var_column[0] = [1,2,3,4,5] + self.assertEqual(two_column, read) + + var_column = [[x, str(x + 10) + 'xf'] for x in range(5)] + var_column[0] = [1, 2, 3, 4, 5] utils.write_csv(file, var_column) read = utils.read_csv(file, convert_nums=True) - self.assertEqual(var_column, read) - - with_comment = [[x, str(x+10)+'xf'] for x in range(5)] + self.assertEqual(var_column, read) + + with_comment = [[x, str(x + 10) + 'xf'] for x in range(5)] utils.write_csv(file, with_comment, comment='test') read = utils.read_csv(file, convert_nums=True) - self.assertEqual(with_comment, read) - + self.assertEqual(with_comment, read) + one_row = [[x for x in range(5)]] utils.write_csv(file, one_row) read = utils.read_csv(file, convert_nums=True) - self.assertEqual(one_row, read) + self.assertEqual(one_row, read) read = utils.read_csv(file, convert_nums=False) - self.assertEqual(read, [[str(x) for x in one_row[0]]]) - - float_2d = [[x+1.4, x+5.2] for x in range(5)] + self.assertEqual(read, [[str(x) for x in one_row[0]]]) + + float_2d = [[x + 1.4, x + 5.2] for x in range(5)] utils.write_csv(file, float_2d) read = utils.read_csv(file, convert_nums=True) - self.assertEqual(float_2d, read) - - float_2d = [[x+1.4, x+5.2] for x in range(5)] + self.assertEqual(float_2d, read) + + float_2d = [[x + 1.4, x + 5.2] for x in range(5)] utils.write_csv(file, float_2d, decimal_sep=',') read = utils.read_csv(file, decimal_sep=',', convert_nums=True) - self.assertEqual(float_2d, read) - + self.assertEqual(float_2d, read) + np_1d = np.random.rand(5) utils.write_csv(file, np_1d) read = np.array(utils.read_csv(file, convert_nums=True)).squeeze() - np.testing.assert_array_equal(np_1d, read) - - np_2d = np.random.rand(1,5) + np.testing.assert_array_equal(np_1d, read) + + np_2d = np.random.rand(1, 5) utils.write_csv(file, np_2d) read = np.array(utils.read_csv(file, convert_nums=True)) - np.testing.assert_array_equal(np_2d, read) - + np.testing.assert_array_equal(np_2d, read) + with self.assertRaises(ValueError): - utils.write_csv(file, np.random.rand(3,3,3)) - + utils.write_csv(file, np.random.rand(3, 3, 3)) + def test_make_key(self): s = 'abcde12345' r = utils.make_key(s) - self.assertEqual(s,r) + self.assertEqual(s, r) s = '12abcde12345' r = utils.make_key(s) - self.assertEqual('x_'+s,r) + self.assertEqual('x_' + s, r) s = '1#$%^&*()[]' r = utils.make_key(s) - self.assertEqual('x_1__________',r) - - + self.assertEqual('x_1__________', r) + def test_valid_filename(self): string = '<>:|?*' for s in string: with self.assertRaises(ValueError): utils.valid_filename(s) - + with self.assertRaises(ValueError): utils.valid_filename('/test.vbin') - utils.valid_filename('\\test.vbin') + utils.valid_filename('\\test.vbin') self.assertTrue(utils.valid_filename('asd/asd\\asd2$.bin')) - + def test_validkey(self): with self.assertRaises(AssertionError): utils.validkey(4) @@ -131,10 +129,10 @@ def test_validkey(self): def test_strip(self): self.assertEqual('abc', utils.strip('abc')) self.assertEqual('abc', utils.strip('{https:////}{{{{}}}}abc')) - + def test_str2num(self): - self.assertEqual(utils.str2num('200_26747'), '200_26747') # due to PEP-515 - self.assertEqual(utils.str2num('20026747'), 20026747) + self.assertEqual(utils.str2num('200_26747'), '200_26747') # due to PEP-515 + self.assertEqual(utils.str2num('20026747'), 20026747) self.assertEqual(utils.str2num('s20026747'), 's20026747') self.assertEqual(utils.str2num('s20.026747'), 's20.026747') self.assertEqual(utils.str2num('20.026747'), 20.026747) @@ -143,19 +141,6 @@ def test_str2num(self): self.assertEqual(utils.str2num('20,026747', ','), 20.026747) self.assertEqual(utils.str2num('20,02,6747', ','), '20,02,6747') + if __name__ == '__main__': unittest.main() - - - - - - - - - - - - - - diff --git a/unisens/entry.py b/unisens/entry.py index 305f2cf..715e1a4 100644 --- a/unisens/entry.py +++ b/unisens/entry.py @@ -1,23 +1,25 @@ # -*- coding: utf-8 -*- """ -Created on Mon Jan 6 21:16:58 2020 +Created on Mon Jan 6 21:16:58 2020 @author: skjerns """ +from __future__ import annotations + import importlib import os, sys -from os.path import join -from os import access +import warnings +from abc import ABC +from typing import List, Tuple + import numpy as np import logging -from .utils import validkey, strip, lowercase, make_key, valid_filename +from .utils import validkey, strip, lowercase, make_key, valid_filename, infer_dtype from .utils import read_csv, write_csv from xml.etree import ElementTree as ET from xml.etree.ElementTree import Element from copy import deepcopy -try: profile #a hack for not having to remove the profile tags when not testing -except NameError: profile = lambda x: x # pass-through decorator def get_module(name): try: @@ -25,11 +27,10 @@ def get_module(name): return module except ModuleNotFoundError: print(f'{name} is not installed. Install with pip install {name}') - return False - raise Exception(f'Cant load module {name}') - + raise Exception(f'Cant load module {name}') + -class Entry(): +class Entry(ABC): """ Base class for Unisens entries. All other entries inherit from this. @@ -48,33 +49,30 @@ class Entry(): **kwargs : TYPE DESCRIPTION. """ - def __len__(self): return len(self._entries) def __iter__(self): - return list(self._entries).__iter__() + return self._entries.__iter__() def __repr__(self): return "<{}({})>".format(self._name, self.attrib) - - # @profile + def __init__(self, attrib=None, parent='.', **kwargs): if attrib is None: - attrib=dict() - self.__dict__['attrib'] = attrib + attrib = dict() + self.__dict__['attrib'] = deepcopy(attrib) self.__dict__.update(self.attrib) self.__dict__['_entries'] = [] self.__dict__['_folder'] = parent.__dict__['_folder'] if isinstance(parent, Entry) else parent self.__dict__['_parent'] = parent if isinstance(parent, Entry) else None self.__dict__['_name'] = lowercase(type(self).__name__) for key in kwargs: - self.key = kwargs[key] + self.set_attrib(key, kwargs[key]) self._autosave() - # @profile def __contains__(self, item): if item in self.__dict__: return True if make_key(item) in self.__dict__: return True @@ -84,8 +82,7 @@ def __contains__(self, item): except: return False - # @profile - def __setattr__(self, name:str, value:str): + def __setattr__(self, name: str, value: str): """ Allows settings of attributes via .name = value. """ @@ -93,24 +90,23 @@ def __setattr__(self, name:str, value:str): if name.startswith('_'): return methods = dir(type(self)) # do not overwrite if it's a builtin method - if name not in methods and \ - isinstance(value, (int, float, bool, bytes, str)): + if name not in methods and \ + isinstance(value, (int, float, bool, bytes, str)): self.set_attrib(name, value) - # @profile def __getattr__(self, key): if key == "__setstate__": raise AttributeError(key) - try: return self.__dict__[key] - except: pass + try: + return self.__dict__[key] + except: + pass try: i, key2 = self._get_index(key) return self.__dict__[key2] - except KeyError: + except KeyError: return self.__getattribute__(key) - raise AttributeError(f'{key} not found') - # @profile def __getitem__(self, key): if isinstance(key, str): i, key = self._get_index(key) @@ -121,31 +117,28 @@ def __getitem__(self, key): def _autosave(self): """ - if autosave is enabled, this function will call the autosave function - of parents until the uppermost Unisens object is reached and then - save when anything is changed. + This function will call the `_autosave` method of `_parents`. + If the uppermost `_parent` is a Unisens object changes will be saved + according to its attribute `_autosave_enabled`. + Otherwise nothing happens. """ - try: - if self._parent is not None: - self._parent._autosave() - except: # there might an error, then do nothing. - pass + # not in Java version available + if self._parent is not None: + self._parent._autosave() def _check_readonly(self): """ will raise an exception if a write operation is requested to readonly file + If available, the `_parent`'s writability is chosen over the instance's. """ - if hasattr(self, '_parent') and self._parent is not None: - if isinstance(self._parent, str): return True - self._parent._check_readonly() - elif hasattr(self, '_readonly'): - if self._readonly: - raise IOError(f'Read only, can\'t write to {self._folder}.') - return True + # not in Java version available + if hasattr(self, '_parent') and hasattr(self._parent, '_check_readonly'): + return self._parent._check_readonly() + elif hasattr(self, '_readonly') and self._readonly: + raise IOError(f'Read only, can\'t write to {self._folder}.') - # @profile - def _get_index(self, id_or_name, raises=True): + def _get_index(self, id_or_name: str, raises: bool = True) -> Tuple[int, str]: """ Receive the index and key-name of an object. @@ -157,89 +150,115 @@ def _get_index(self, id_or_name, raises=True): for the entry in ._entry and returns its index as well as its key name in the __dict__ - Parameters - ---------- - id_or_name : str + Caution: this only returns the first entry's index in _entries when stacking + + :param id_or_name: str The name or id of the Entry. - raises : bool, optional + :param raises: bool, optional Raise an exception if not found. The default is True. - - Returns - ------- - [index in ._entries, key-name in __dict__]. + :return: + i: int, index in ._entries + key: str, key-name in __dict__, not id/_name in entry """ - - id_or_name = id_or_name.upper() - id_or_name_key = make_key(id_or_name) - + + id_or_name_key_upper = make_key(id_or_name).upper() + # we don't care about case, gently ignoring Linux file case-sensitivity # first check for exact match for i, entry in enumerate(self._entries): if hasattr(entry, 'id'): - id = entry.id - id_key = make_key(id) - if id_or_name.upper()==id: - return i, id_key # check for exact match - if id_key.upper()==id_or_name_key: - return i, id_key # check for match in key notation + id_key = make_key(entry.id) + if id_key.upper() == id_or_name_key_upper: + return i, id_key # check for match in key notation else: name = entry._name name_key = make_key(name) - if name.upper()==id_or_name.upper(): # same as above - return i, name - if name_key.upper()==id_or_name_key: + if name_key.upper() == id_or_name_key_upper: return i, name + id_or_name_upper = id_or_name.upper() found = [] for i, entry in enumerate(self._entries): if hasattr(entry, 'id'): - id = entry.id.replace('\\', '/').upper() # normalize to linux-slash - no_ext = id.rsplit('.', 1)[0] # remove file extension + id_upper = entry.id.upper() + no_ext = id_upper.rsplit('.', 1)[0] # remove file extension # check if file without extension was requested # e.g. 'test' for test.txt - if no_ext==id_or_name: - found+=[(i, make_key(entry.id))] - # contains a slash/subdir, so we need to trim that off - elif '/' in id: + if no_ext == id_or_name_upper: + found += [(i, make_key(entry.id))] + elif '/' in id_upper or '\\' in id_upper: # remove subdirectories # e.g. 'test' was requested for 'sub/test.txt' - if os.path.basename(no_ext)==id_or_name: - found+=[(i, make_key(entry.id))] + if os.path.basename(no_ext) == id_or_name_upper: + found += [(i, make_key(entry.id))] # e.g. 'test.txt' was requested for 'sub/test.txt' - elif os.path.basename(id)==id_or_name: - found+=[(i, make_key(entry.id))] - # no subdir, but full match - elif id==id_or_name: - found+=[(i, make_key(entry.id))] + elif os.path.basename(id_upper) == id_or_name_upper: + found += [(i, make_key(entry.id))] + if len(found) == 1: return found[0] + if len(found) > 1: raise IndexError(f'More than one match for {id_or_name}: {found}') + raise KeyError(f'{id_or_name} not found') + def _set_channels(self, ch_names: List[str], n_data: int): + """ + Checks existing channel attributes. + Overwrites channel attributes if ch_names are supplied. + Writes generic channel names if nothing is given. + → Use in set_data for classes requiring channel names. - if len(found)==1: return found[0] - if len(found)>1: raise IndexError(f'More than one match for {id_or_name}: {found}') - raise KeyError(f'{id_or_name} not found') - - def copy(self): + :param ch_names: List of channel names (str) or single channel name as str or None + :param n_data: amount of required channel names + This might match lines / columns of data or + one less if an index is expected. """ - Create a deep copy of this Entry. - - All references to the parent Unisens object will be removed before. - + if ch_names is not None: + if isinstance(ch_names, str): ch_names = [ch_names] + # this means new channel names are indicated and will overwrite. + assert len(ch_names) == n_data, f'len {ch_names}!={n_data}' + if hasattr(self, 'channel'): + logging.warning('Channels present will be overwritten') + self.remove_entry('channel') + for name in ch_names: + channel = MiscEntry('channel', key='name', value=name) + self.add_entry(channel) + elif not hasattr(self, 'channel'): + # this means no channel names are indicated and none exist + warnings.warn('Channel naming will be mandatory with the next release. ' + 'Please provide a list of channel names with set_data().', + category=DeprecationWarning, stacklevel=2) + # we create new generic names for the channels + logging.info('No channel names indicated, will use generic names') + for i in range(n_data): + channel = MiscEntry('channel', key='name', value=f'ch_{i}') + self.add_entry(channel) + elif len(self.channel) == n_data: + # this means channel information is present and matches array + pass + else: + # this means there are channel names there but do not match n_data + raise ValueError('Channel names must match data') + + def copy(self) -> Entry: + """ + Create a deep copy of this Entry without copying the parent. + `_parent` is set to None for the resulting copy. + Returns ------- copy : Entry An Entry object with all attributes as the invoking object. """ + # in Java: clone when adding entry if hasattr(self, '_parent'): _parent = self._parent - del self._parent + self._parent = None copy = deepcopy(self) self._parent = _parent else: copy = deepcopy(self) return copy - # @profile - def add_entry(self, entry:'Entry', stack:bool=True): + def add_entry(self, entry: Entry, stack: bool = True): """ Add an subentry to this entry @@ -265,57 +284,46 @@ def add_entry(self, entry:'Entry', stack:bool=True): # there are several Entries that have reserved names. # these should not exist double, therefore they are re-set here reserved = ['binFileFormat', 'csvFileFormat', 'customFileFormat'] - if entry._name in reserved: - stack=False - + name = entry.attrib.get('id', entry.__dict__['_name']) - - if not stack: - try: self.remove_entry(name) - except: pass - - self._entries.append(entry) - - # if an entry already exists with this exact name - # we put the entry inside of a list and append the new entry - # with the same name. This way all entries are saved name = make_key(name) + + if (not stack or entry._name in reserved): + # remove old entry with this name if necessary + try: + self.remove_entry(name) + except KeyError: + pass + if name in self.__dict__: - if isinstance(self.__dict__[name], list): - self.__dict__[name].append(entry) - else: + # stack entries with the same name inside a list + if not isinstance(self.__dict__[name], list): self.__dict__[name] = [self.__dict__[name]] - self.__dict__[name].append(entry) + self.__dict__[name].append(entry) else: self.__dict__[name] = entry - entry.__dict__['_parent'] = self + + self._entries.append(entry) + entry._parent = self self._autosave() return self - - # @profile - def remove_entry(self, name): + + def remove_entry(self, name: str): """ - Removes an subentry by name. + Removes a subentry by name. Parameters ---------- name : str the name of the entry. Can be abbreviated, e.g. 'samples' instead of 'samples.csv'. - """ i, key = self._get_index(name) - entry = self._entries[i] del self._entries[i] del self.__dict__[key] - # if this is an unisens object, also delete the referer there - if self.__dict__.get('entries') is not None: - for key, e in list(self.__dict__['entries'].items()): - if e==entry: del self.__dict__['entries'][key] return self - # @profile - def set_attrib(self, name:str, value:str): + def set_attrib(self, name: str, value: str): """ Set an attribute of this Entry @@ -328,11 +336,11 @@ def set_attrib(self, name:str, value:str): """ name = validkey(name) self.attrib[name] = value - self.__dict__.update(self.attrib) + self.__dict__.update({name: value}) self._autosave() return self - def get_attrib(self, name:str, default=None): + def get_attrib(self, name: str, default=None) -> str: """ Retrieves an attribute of this Entry @@ -347,7 +355,7 @@ def get_attrib(self, name:str, default=None): return self.attrib.get(name, default) - def remove_attr(self, name:str): + def remove_attr(self, name: str): """ Removes a custom attribute/value of this entry. @@ -361,14 +369,14 @@ def remove_attr(self, name:str): TYPE DESCRIPTION. """ - if name in self.attrib: + if name in self.attrib: del self.attrib[name] del self.__dict__[name] else: logging.error('{} not in attrib'.format(name)) self._autosave() - return self - + return self + def to_element(self): """ Converts this Entry and all its subentries into an XML Element. @@ -387,7 +395,7 @@ def to_element(self): for subelement in self._entries: element.append(subelement.to_element()) return element - + def to_xml(self): """ Creates a string representing this Entry and all its sub-entries @@ -399,17 +407,18 @@ def to_xml(self): XML string representing this Entry instance. """ - + element = self.to_element() return ET.tostring(element).decode() - + def print_summary(self, indent=0): """ Prints a summary of this object containing all entries """ - print('\t'*indent, self) + print('\t' * indent, self) for entry in self._entries: - entry.print_summary(indent+1) + entry.print_summary(indent + 1) + class FileEntry(Entry): """ @@ -417,47 +426,41 @@ class FileEntry(Entry): Subtypes of FileEntry include ValuesEntry, EventEntry, CustomEntry and SignalEntry. """ - + def __repr__(self): id = self.attrib.get('id', 'None') return "<{}({})>".format(self._name, id) - # @profile def __init__(self, id, attrib=None, parent='.', **kwargs): - if isinstance(id, str): id = id.replace('\\', '/') super().__init__(attrib=attrib, parent=parent, **kwargs) if 'id' in self.attrib: - self._filename = self._folder + '/' + self.id + # reading entry (id == None) + valid_filename(self.id) + self._filename = os.path.join(self._folder, self.id) if not os.access(self._filename, os.F_OK): logging.error('File {} does not exist'.format(self.id)) - folder = os.path.dirname(self._filename) - if not os.path.exists(folder): - os.makedirs(folder, exist_ok=True) - self.set_attrib('id', self.attrib['id']) elif id: - if os.path.splitext(str(id))[-1]=='': + # writing entry + valid_filename(id) + if os.path.splitext(str(id))[-1] == '': logging.warning('id should be a filename with extension ie. .bin') self._filename = os.path.join(self._folder, id) self.set_attrib('id', id) - folder = os.path.dirname(self._filename) - if not os.path.exists(folder): - os.makedirs(folder, exist_ok=True) + # ensure subdirectories exist to write data + if '/' in id or '\\' in id: + sub_folder = os.path.dirname(self._filename) + os.makedirs(sub_folder, exist_ok=True) else: - raise ValueError('id must be supplied') - valid_filename(self.id) + raise ValueError('The id must be supplied if it is not yet set.') if isinstance(parent, Entry): parent.add_entry(self) - - - - class SignalEntry(FileEntry): - - def __init__(self, id=None, attrib=None, parent='.', **kwargs): + + def __init__(self, id=None, attrib=None, parent='.', **kwargs): super().__init__(id=id, attrib=attrib, parent=parent, **kwargs) - - def get_data(self, scaled:bool=True, return_type:str='numpy') -> np.array: + + def get_data(self, scaled: bool = True, return_type: str = None) -> np.array: """ Will try to load the binary data using numpy. This might not always work as endianess can't be determined @@ -476,32 +479,52 @@ def get_data(self, scaled:bool=True, return_type:str='numpy') -> np.array: """ + if return_type is not None: + warnings.warn(f'The argument `return_type` has no effect and will be removed with the next release.', + category=DeprecationWarning, stacklevel=2) + + if self.id.endswith('csv'): + data = np.genfromtxt(self._filename, dtype=str, delimiter=self.csvFileFormat.separator) + data = data.astype(float) + return data.T + + assert self.id.endswith('bin') and 'lsbValue' in dir(self), \ + 'incompatible id: SignalEntry only allows for .bin or .csv format' n_channels = len(self.channel) if isinstance(self.channel, list) else 1 dtypestr = self.dataType.lower() dtype = np.__dict__.get(dtypestr, f'UNKOWN_DATATYPE: {dtypestr}') data = np.fromfile(self._filename, dtype=dtype) if scaled: - if 'baseline' in dir(self): - data = ((data-float(self.baseline)) * float(self.lsbValue)) - else: + if 'baseline' in self.attrib: + data = ((data - float(self.baseline)) * float(self.lsbValue)) + elif self.lsbValue != 1: data = (data * float(self.lsbValue)) return data.reshape([-1, n_channels]).T - - - def set_data(self, data:np.ndarray, dataType:str=None, ch_names:list=None, - sampleRate:int=256, lsbValue:float=1, unit:str=None, - comment:str=None, contentClass:str=None, - adcResolution:int=None, baseline:float=None, **kwargs): + + def set_data(self, data: np.ndarray, sampleRate: float = None, dataType: str = None, + ch_names: list = None, unit: str = None, + lsbValue: float = None, adcZero: int = None, + adcResolution: int = None, baseline: int = None, + comment: str = None, contentClass: str = None, + source: str = None, sourceId: str = None, + decimalSeparator: str = '.', separator: str = ';', **kwargs): """ Set the data that is connected to this SignalEntry. + The decision between binary and csv output is made with the 'id' from initialization. + + binary: Data will be stored after formatting to dataType. Please ensure that formatting is possible without + loss of information. Scaling (with lsbValue and baseline) is only supported for reading. The data will in any case be saved with Endianness LITTLE, as this is the default for numpy. Data will be saved using numpy binary data output. + csv: Similar to ValuesEntry except that the data is expected to be + consecutive and thus not indexed. + Parameters ---------- data : np.ndarray - an numpy array. + in lines! len(self.channel) == len(data) dataType : str, optional The data type of the data. If None, is infered automatically. Can be 'DOUBLE', 'FLOAT', 'INT16', 'INT32', @@ -527,109 +550,87 @@ def set_data(self, data:np.ndarray, dataType:str=None, ch_names:list=None, **kwargs : TYPE DESCRIPTION. """ - self._check_readonly() - - file = os.path.join(self._folder, self.id) - - # if list, convert to numpy array - if isinstance(data, list): - # if the list entries are arrays, we can infer the dtype from them - if isinstance(data[0], np.ndarray) and dataType is None: - dtype = str(data[0].dtype) - else: - dtype = type(data[0][0]) - if dtype==np.int64 or dtype==int: dtype=np.int32 - data = np.array(data, dtype=dtype) - - data = np.atleast_2d(data) - order = sys.byteorder.upper() # endianess - fileFormat = MiscEntry('binFileFormat', key='endianess', value=order) - self.add_entry(fileFormat) - #### dtype inference start - dtype_mapping = {'float16': 'float', - 'float32': 'float', - 'float64': 'double', - 'int': 'int32'} + self._check_readonly() + data = np.atleast_2d(np.array(data)) if dataType is None: - dataType = str(data.dtype).upper() + if 'dataType' in self.attrib: + dataType = self.dataType + else: + dataType = str(data.dtype) + self.set_attrib('dataType', infer_dtype(dataType).lower()) + + if lsbValue is not None: + self.set_attrib('lsbValue', lsbValue) + elif 'lsbValue' not in self.attrib: + self.set_attrib('lsbValue', 1) + if baseline is not None: + self.set_attrib('baseline', baseline) + + if self.id.endswith('csv'): + fileFormat = MiscEntry('csvFileFormat', parent=self) + fileFormat.set_attrib('decimalSeparator', decimalSeparator) + fileFormat.set_attrib('separator', separator) + self.add_entry(fileFormat) + + write_csv(self._filename, data.T, sep=self.csvFileFormat.separator, + decimal_sep=self.csvFileFormat.decimalSeparator) + elif self.id.endswith('bin'): + order = sys.byteorder.upper() # endianess + fileFormat = MiscEntry('binFileFormat', key='endianess', value=order) + self.add_entry(fileFormat) + + if 'int' in dataType: + data = np.round(data, 10) + data_formatted = data.astype(dataType) + assert abs(np.sum(data - data_formatted)) < 1e-10, \ + f"Can't format to dataType {dataType} without loss." + + # save data transposed because unisens reads rows*columns not columns*rows like numpy + data_formatted.T.tofile(self._filename) + else: + raise ValueError('incompatible id: SignalEntry only allows for .bin or .csv format') - dataType = dtype_mapping.get(dataType.lower(), dataType) - allowed_dtypes = ['DOUBLE', 'FLOAT', 'INT16', 'INT32', - 'INT8', 'UINT16', 'UINT32', 'UINT8'] - assert dataType.upper() in allowed_dtypes,\ - f'{dataType} is not in {allowed_dtypes}' + self._set_channels(ch_names, n_data=len(data)) - #### dtype inference end - - # if we get a string supplied, we convert to list - if isinstance(ch_names, str): ch_names = [ch_names] - - if ch_names is None and hasattr(self, 'channel') and\ - len(self.channel)==len(data): - # this means channel information is present and matches array - pass - elif ch_names is not None: - # this means new channel names are indicated and will overwrite. - assert len(ch_names)==len(data), f'len {ch_names}!={len(data)}' - if hasattr(self, 'channel'): - logging.warning('Channels present will be overwritten') - self.remove_entry('channel') - for name in ch_names: - channel = MiscEntry('channel', key='name', value=name) - self.add_entry(channel) - elif ch_names is None and not hasattr(self, 'channel'): - # this means no channel names are indicated and none exist - # we create new generic names for the channels - logging.info('No channel names indicated, will use generic names') - for i in range(len(data)): - channel = MiscEntry('channel', key='name', value=f'ch_{i}') - self.add_entry(channel) - else: - # this means there are channel names there but do not match n_data - raise ValueError('Must indicate channel names') - - # save data using numpy , - # .T because unisens reads rows*columns not columns*rows like numpy - data.T.tofile(file) - - if baseline is not None: self.set_attrib('baseline', baseline) if sampleRate is not None: self.set_attrib('sampleRate', sampleRate) - if lsbValue is not None: self.set_attrib('lsbValue', lsbValue) + assert 'sampleRate' in self.attrib, "Please specify sampleRate for correct visualization." if unit is not None: self.set_attrib('unit', unit) if comment is not None: self.set_attrib('comment', comment) if contentClass is not None: self.set_attrib('contentClass', contentClass) - if dataType is not None: self.set_attrib('dataType', dataType.lower()) - if adcResolution is not None: self.set_attrib('adcResolution',adcResolution) - + if adcZero is not None: self.set_attrib('adcZero', adcZero) + if adcResolution is not None: self.set_attrib('adcResolution', adcResolution) + if source is not None: self.set_attrib('source', source) + if sourceId is not None: self.set_attrib('sourceId', sourceId) + # set all other keyword arguments/comments as well. for key in kwargs: self.set_attrib(key, kwargs[key]) - + self._autosave() - return self + return self + class CsvFileEntry(FileEntry): """ A FileEntry that links a csv file. """ - def __init__(self, id=None, attrib=None, parent='.', + + def __init__(self, id=None, attrib=None, parent='.', decimalSeparator='.', separator=';', **kwargs): super().__init__(id=id, attrib=attrib, parent=parent, **kwargs) assert decimalSeparator and separator, 'Must supply separators' if not self.id.endswith('csv'): logging.warning(f'id "{id}" does not end in .csv') - + csvFileFormat = MiscEntry('csvFileFormat', parent=self) csvFileFormat.set_attrib('decimalSeparator', decimalSeparator) csvFileFormat.set_attrib('separator', separator) self.add_entry(csvFileFormat) - - - - def set_data(self, data:list, **kwargs): + + def set_data(self, data: list, **kwargs): """ Set data of this csv object. @@ -641,28 +642,27 @@ def set_data(self, data:list, **kwargs): **kwargs : str DESCRIPTION. """ - + self._check_readonly() - assert 'csvFileFormat' in self.__dict__, 'csvFileFormat information'\ - 'missing: No separator and decimal set' - assert isinstance(data, (list, np.ndarray)),\ + assert 'csvFileFormat' in self.__dict__, 'csvFileFormat information' \ + 'missing: No separator and decimal set' + assert isinstance(data, (list, np.ndarray)), \ f'data must be list of lists, is {type(data)}' sep = self.csvFileFormat.separator dec = self.csvFileFormat.decimalSeparator + if len(data) == 0 or len(data[0]) < 2: logging.warning('Should supply at least two columns: ' \ + 'time and data') - if len(data)==0 or len(data[0])<2: logging.warning('Should supply at least two columns: '\ - 'time and data') - write_csv(self._filename, data, sep=sep, decimal_sep=dec) - + for key in kwargs: self.set_attrib(key, kwargs[key]) self._autosave() return self - - def get_data(self, mode:str='list'): + + def get_data(self, mode: str = 'list'): """ Will try to load the csv data using a list, pandas or numpy. @@ -672,14 +672,14 @@ def get_data(self, mode:str='list'): """ sep = self.csvFileFormat.separator dec = self.csvFileFormat.decimalSeparator - + if mode in ('numpy', 'np', 'array'): - lines = np.genfromtxt(self._filename, delimiter=sep, - dtype=str) + lines = np.genfromtxt(self._filename, delimiter=sep, + dtype=str) elif mode in ('pandas', 'pd', 'dataframe'): import pandas as pd lines = pd.read_csv(self._filename, sep=sep, - header=None, index_col=None) + header=None, index_col=None) elif mode == 'list': lines = read_csv(self._filename, sep=sep, decimal_sep=dec, convert_nums=True) @@ -688,7 +688,6 @@ def get_data(self, mode:str='list'): '["numpy", "pandas", "list"]'.format(mode)) return lines - def get_times(self): """ Retrieves the times or samples of this CSV entry @@ -701,8 +700,7 @@ def get_times(self): data = self.get_data() times, _ = zip(*data) return list(times) - - + def get_labels(self): """ Retrieves the labels this CSV entry @@ -723,53 +721,29 @@ class ValuesEntry(CsvFileEntry): :param test: test """ - - def __init__(self, id=None, attrib=None, parent='.' , **kwargs): + + def __init__(self, id=None, attrib=None, parent='.', **kwargs): super().__init__(id=id, attrib=attrib, parent=parent, **kwargs) - - def set_data(self, data:list, ch_names=None, **kwargs): + + def set_data(self, data: list, ch_names=None, **kwargs): # if we get a string supplied, we convert to list - super().set_data(data, **kwargs) - - if isinstance(ch_names, str): ch_names = [ch_names] - n_cols = len(data[0])-1 - - if ch_names is None and hasattr(self, 'channel') and\ - len(self.channel)==n_cols: - # this means channel information is present and matches array - pass - elif ch_names is not None: - # this means new channel names are indicated and will overwrite. - assert len(ch_names)==n_cols, f'len {ch_names}!={len(data)}' - if hasattr(self, 'channel'): - logging.warning('Channels present will be overwritten') - self.remove_entry('channel') - for name in ch_names: - channel = MiscEntry('channel', key='name', value=name) - self.add_entry(channel) - elif ch_names is None and not hasattr(self, 'channel'): - # this means no channel names are indicated and none exist - # we create new generic names for the channels - logging.info('No channel names indicated, will use generic names') - for i in range(n_cols): - channel = MiscEntry('channel', key='name', value=f'ch_{i}') - self.add_entry(channel) - else: - # this means there are channel names there but do not match n_data - raise ValueError('Must indicate channel names') + super().set_data(data, **kwargs) + + self._set_channels(ch_names, n_data=len(data[0]) - 1) + self._autosave() return self - + class EventEntry(CsvFileEntry): - def __init__(self, id=None, attrib=None, parent='.' , **kwargs): + def __init__(self, id=None, attrib=None, parent='.', **kwargs): super().__init__(id=id, attrib=attrib, parent=parent, **kwargs) - - + + class CustomEntry(FileEntry): - + def __init__(self, id=None, **kwargs): - super().__init__(id=id, **kwargs) + super().__init__(id=id, **kwargs) self._autosave() def get_data(self, dtype='auto'): @@ -788,58 +762,58 @@ def get_data(self, dtype='auto'): :returns: the binary data or the otherwise loaded data """ - if dtype=='auto': - + if dtype == 'auto': + ext = os.path.splitext(self._filename)[-1].lower() txt_exts = ['.txt', '.csv', '.ini'] img_exts = ['.jpeg', '.jpg', '.bmp', '.png', '.tif', '.gif'] - - if hasattr(self, 'dataType'): + + if hasattr(self, 'dataType'): dtype = self.dataType elif ext in img_exts: - dtype='image' + dtype = 'image' elif ext in txt_exts: - dtype='text' - elif ext=='.json': - dtype='json' - elif ext=='.npy': - dtype='numpy' - elif ext=='.pkl': - dtype='pickle' + dtype = 'text' + elif ext == '.json': + dtype = 'json' + elif ext == '.npy': + dtype = 'numpy' + elif ext == '.pkl': + dtype = 'pickle' else: - dtype='binary' - - if dtype=='binary': + dtype = 'binary' + + if dtype == 'binary': with open(self._filename, 'rb') as f: data = f.read() - elif dtype=='csv': + elif dtype == 'csv': data = read_csv(self._filename) - elif dtype=='text': + elif dtype == 'text': with open(self._filename, 'r') as f: data = f.read() - elif dtype=='image': - imageio = get_module('imageio') + elif dtype == 'image': + imageio = get_module('imageio.v2') data = imageio.imread(self._filename) - elif dtype=='pickle': + elif dtype == 'pickle': pickle = get_module('pickle') with open(self._filename, 'rb') as f: data = pickle.load(f) - elif dtype=='json': + elif dtype == 'json': try: import json_tricks as json except: json = get_module('json') with open(self._filename, 'r') as f: data = json.load(f) - elif dtype=='numpy': + elif dtype == 'numpy': data = np.load(self._filename) else: raise ValueError('unknown dtype {}'.format(dtype)) - + self.dataType = dtype return data - + def set_data(self, data, dtype='auto', **kwargs): """ Will save custom data to disk. @@ -851,108 +825,106 @@ def set_data(self, data, dtype='auto', **kwargs): self._check_readonly() # infer datatype automatically - if dtype=='auto': + if dtype == 'auto': ext = os.path.splitext(self._filename)[-1].lower() txt_exts = ['.txt', '.ini', '.csv'] img_exts = ['.jpeg', '.jpg', '.bmp', '.png', '.tif', '.gif'] - + if ext in img_exts: - dtype='image' + dtype = 'image' elif ext in txt_exts: - dtype='text' - elif ext=='.json': - dtype='json' - elif ext=='.pkl': - dtype='pickle' - elif ext=='.npy': - dtype='numpy' + dtype = 'text' + elif ext == '.json': + dtype = 'json' + elif ext == '.pkl': + dtype = 'pickle' + elif ext == '.npy': + dtype = 'numpy' else: - dtype='binary' - + dtype = 'binary' + # file saving from here on - if dtype=='binary': + if dtype == 'binary': with open(self._filename, 'wb') as f: f.write(data) - elif dtype=='text': + elif dtype == 'text': with open(self._filename, 'w') as f: f.write(data) - elif dtype=='image': + elif dtype == 'image': imageio = get_module('imageio') imageio.imsave(self._filename, data) - elif dtype=='pickle': + elif dtype == 'pickle': pickle = get_module('pickle') with open(self._filename, 'wb') as f: data = pickle.dump(data, f, protocol=3) - elif dtype=='json': + elif dtype == 'json': try: import json_tricks as json tricks_installed = True except: json = get_module('json') - tricks_installed = False + tricks_installed = False with open(self._filename, 'w') as f: if tricks_installed: json.dump(data, f, allow_nan=True) else: json.dump(data, f) - elif dtype=='numpy': + elif dtype == 'numpy': data = np.save(self._filename, data) else: raise ValueError('unknown dtype {}'.format(dtype)) - + for key in kwargs: self.set_attrib(key, kwargs[key]) - + # save dtype within the entry self.dataType = dtype - + self._autosave() return self - - + + class CustomAttributes(Entry): - def __init__(self, key:str=None, value:str=None, **kwargs): - super().__init__(**kwargs) + def __init__(self, key: str = None, value: str = None, **kwargs): + super().__init__(**kwargs) if key and value: self.set_attrib(key, value) - + def to_element(self): element = Element(self._name, attrib={}) element.tail = '\n \n \n ' element.text = '\n' - for key in self.attrib: - customAttribute = MiscEntry(name = 'customAttribute') - customAttribute.key = key - customAttribute.value = self.attrib[key] - subelement = customAttribute.to_element() + for key, value in self.attrib.items(): + subelement = Element('customAttribute', key=key, value=str(value)) element.append(subelement) return element - - def add_entry(self, entry:'MiscEntry'): - if entry._name != 'customAttribute': - logging.error('Can only add customAttribute type') - return + + def add_entry(self, entry: MiscEntry): + """ When reading the unisens.xml file, each CustomAttribute is an entry (i.e. subelement) + to CustomAttributes and only contains key and value. + In contrast, pyunisens will save a CustomAttribute as attribute to CustomAttributes""" + assert entry._name == 'customAttribute', 'Can only add customAttribute type' self.set_attrib(entry.key, entry.value) self._autosave() - - + + class MiscEntry(Entry): - def __init__(self, name:str, key:str=None, value:str=None, **kwargs): - super().__init__(**kwargs) + def __init__(self, name: str, key: str = None, value: str = None, **kwargs): + """ For various smaller types of entries. The `name` describes the type and can be + ['channel', 'context', 'customAttribute', 'group', 'groupEntry', + 'binFileFormat', 'csvFileFormat', 'customFileFormat']""" + super().__init__(**kwargs) self._name = strip(name) if key and value: self.set_attrib(key, value) self._autosave() - - -class CustomAttribute(MiscEntry): - def __new__(*args,**kwargs): - return MiscEntry('customAttribute', **kwargs) - - - - - - \ No newline at end of file +class CustomAttribute(MiscEntry): + """dummy class for reading from unisens.xml. + An actual CustomAttribute is stored as attribute to CustomAttributes not as entry.""" + def __new__(*args, **kwargs): + warnings.warn("CustomAttribute will be removed in the next release. " + "Use CustomAttributes.set_attrib('key', 'value') instead.", + category=DeprecationWarning, stacklevel=2) + return MiscEntry('customAttribute', *args, **kwargs) diff --git a/unisens/main.py b/unisens/main.py index 743d0bb..340f31b 100644 --- a/unisens/main.py +++ b/unisens/main.py @@ -17,12 +17,12 @@ todo: add group todo: channel to valuesentry todo: parent in folder/parent -todo: coherent attribute setting in __init__ and set_data() @author: skjerns """ import os import logging import datetime +import warnings from xml.etree import ElementTree as ET from xml.etree.ElementTree import Element from .entry import Entry, FileEntry, ValuesEntry, SignalEntry, MiscEntry @@ -30,9 +30,6 @@ from .utils import AttrDict, strip, validkey, lowercase, make_key, indent from .utils import str2num -# try: profile #a hack for not having to remove the profile tags when not testing -# except NameError: profile = lambda x: x # pass-through decorator - class Unisens(Entry): """ @@ -47,9 +44,9 @@ class Unisens(Entry): If no unisens.xml is present and new=False :param attrib: The attribute """ - # @profile - def __init__(self, folder, makenew=False, autosave=False, readonly=False, - comment:str='', duration:int=0, measurementId:str='NaN', + + def __init__(self, folder: str, makenew=False, autosave=False, readonly=False, + comment: str = '', duration: int = 0, measurementId: str = 'NaN', timestampStart='', filename='unisens.xml', convert_nums=False): """ @@ -67,31 +64,30 @@ def __init__(self, folder, makenew=False, autosave=False, readonly=False, :param attrib: The attribute :param convert_nums: try to convert numbers from attribs automatically """ - assert autosave!=readonly or not autosave and not readonly, \ + assert not (autosave and readonly), \ 'either read-only or autosave can be enabled' assert isinstance(folder, str), f'folder must be string, is {folder}' - self._folder = folder - self._file = os.path.join(folder, filename) - os.makedirs(folder, exist_ok=True) - folder = os.path.dirname(folder + '/') + self._folder = os.path.normpath(folder) + self._file = os.path.join(self._folder, filename) + os.makedirs(self._folder, exist_ok=True) self.entries = AttrDict() self._entries = list() self._name = 'unisens' self._readonly = readonly self._convert_nums = convert_nums - + if os.path.isfile(self._file) and not makenew: - logging.debug('loading unisens.xml from {}'.format(\ - self._file)) - self.read_unisens(folder, filename=filename) + logging.debug('loading unisens.xml from {}'.format(self._file)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + self.read_unisens() else: - logging.debug('New unisens.xml will be created at {}'.format(\ - self._file)) + logging.debug('New unisens.xml will be created at {}'.format(self._file)) if not timestampStart: now = datetime.datetime.now() timestampStart = now.strftime('%Y-%m-%dT%H:%M:%S') - self.attrib ={} + self.attrib = {} self.set_attrib('comment', comment) self.set_attrib('duration', duration) self.set_attrib('measurementId', measurementId) @@ -109,7 +105,7 @@ def __str__(self): duration = int(str2num(self.__dict__.get('duration', 0))) duration = str(datetime.timedelta(seconds=int(duration))) except: - duration = 'N/A' + duration = 'N/A' n_entries = len(self.entries) if hasattr(self, 'entries') else 0 id = self.__dict__.get('measurementId', 'no ID') s = 'Unisens: {}({}, {} entries)'.format(id, duration, n_entries) @@ -118,25 +114,24 @@ def __str__(self): def __repr__(self): comment = self.attrib.get('comment', '') - comment = comment[:20] + '[..]'*(len(comment)>0) + comment = comment[:20] + '[..]' * (len(comment) > 0) try: duration = int(str2num(self.__dict__.get('duration', 0))) duration = str(datetime.timedelta(seconds=int(duration))) except: - duration = 'Can\'t calculate duration' + duration = 'Can\'t calculate duration' measurementId = self.attrib.get('measurementId', 0) timestampStart = self.attrib.get('timestampStart', 0) s = f'Unisens(comment={comment}, duration={duration}, ' \ f'id={measurementId},timestampStart={timestampStart})' - return s + return s def _autosave(self): if self.__dict__.get('_autosave_enabled', False): - self.save() + self.save() - # @profile - def add_entry(self, entry:Entry): + def add_entry(self, entry: Entry, stack=None): """ Add a subentry to this unisens object, e.g ValueEntry, SignalEntry @@ -145,14 +140,22 @@ def add_entry(self, entry:Entry): """ entry._folder = self._folder if isinstance(entry, FileEntry): - if entry.id in self: + if entry.id in self: raise KeyError(f'{entry.id} already present in Unisens') self.entries[entry.id] = entry super().add_entry(entry, stack=False) return self - - # @profile - def unpack_element(self, element:( Element, ET)) -> Entry: + + def remove_entry(self, name: str): + i, key = self._get_index(name) + entry = self._entries.pop(i) + for e_name, e in list(self.entries.items()): + if e == entry: + del self.entries[e_name] + del self.__dict__[key] + return self + + def unpack_element(self, element: (Element, ET)) -> Entry: """ Unpacks an xmltree element iteratively into an the corresponding subtype Entry object. @@ -164,12 +167,12 @@ def unpack_element(self, element:( Element, ET)) -> Entry: if self._convert_nums: for key, value in attrib.items(): attrib[key] = str2num(value) - + entryType = strip(element.tag) if entryType == 'customAttributes': entry = CustomAttributes(attrib=attrib, parent=self._folder) elif entryType == 'eventEntry': - entry = EventEntry(attrib=attrib, parent=self._folder, + entry = EventEntry(attrib=attrib, parent=self._folder, separator=';', decimalSeparator='.') elif entryType == 'signalEntry': entry = SignalEntry(attrib=attrib, parent=self._folder) @@ -178,7 +181,7 @@ def unpack_element(self, element:( Element, ET)) -> Entry: separator=';', decimalSeparator='.') elif entryType == 'customEntry': entry = CustomEntry(attrib=attrib, parent=self._folder) - elif entryType in ('context', 'group', 'customAttribute', + elif entryType in ('context', 'group', 'customAttribute', 'csvFileFormat', 'channel', 'binFileFormat', 'customFileFormat', 'groupEntry'): name = element.tag @@ -187,14 +190,14 @@ def unpack_element(self, element:( Element, ET)) -> Entry: if not 'Entry' in element.tag: logging.warning('Unknown entry type: {}'.format(entryType)) name = element.tag - entry = MiscEntry(name=name, attrib=attrib, parent=self._folder) - + entry = MiscEntry(name=name, attrib=attrib, parent=self._folder) + for subelement in element: subentry = self.unpack_element(subelement) entry.add_entry(subentry) return entry - - def save(self, folder:str=None, filename:str='unisens.xml') -> Entry: + + def save(self, folder: str = None, filename: str = 'unisens.xml') -> Entry: """ Save this Unisens xml file to a given folder and filename. filename should be unisens.xml, but can be altered if necessary @@ -204,64 +207,65 @@ def save(self, folder:str=None, filename:str='unisens.xml') -> Entry: :param filename: the filename to save. use unisens.xml. """ self._check_readonly() - + if folder is None: folder = self._folder if filename is None: filename = os.path.basename(self._file) - + file = os.path.join(folder, filename) ET.register_namespace("", "http://www.unisens.org/unisens2.0") element = self.to_element() indent(element) et = ET.ElementTree(element) - et.write(file, xml_declaration=True, default_namespace='', + et.write(file, xml_declaration=True, default_namespace='', encoding='utf-8') return self - - # @profile - def read_unisens(self, folder:str, filename='unisens.xml') -> Entry: + def read_unisens(self, folder: str = None, filename='unisens.xml') -> Entry: """ Loads an XML Unisens file into this Unisens object. That means, self.attrib and self.children are added as well as tag, tail and text - - :param folder: folder where the unisens.xml is located. - :returns: self """ - folder += '/' # to avoid any ospath errors and confusion, append / - file = os.path.join(os.path.dirname(folder), filename) + warnings.warn(f'`read_unisens` is deprecated and will be removed with the ' + f'next release. Please read your unisens file by calling' + f' Unisens(folder=folder, filename=filename).', + category=DeprecationWarning, stacklevel=2) + # Saving data from one unisens file to another is still possible with Unisens.save() . + if folder is None: + file = self._file + else: + file = os.path.normpath(os.path.join(folder, filename)) if not os.path.isfile(file): - raise FileNotFoundError('{} does not exist'.format(folder)) - + raise FileNotFoundError('{} does not exist'.format(file)) + try: root = ET.parse(file).getroot() except Exception as e: print('Error reading {}'.format(file)) raise e - + # copy all attributes from root to this Unisens object self.attrib = root.attrib - + # convert strings to numbers if that is requested - + if self._convert_nums: for key, value in self.attrib.items(): self.attrib[key] = str2num(value) - + # now add all elements that are contained in this XML object - + for element in root: entry = self.unpack_element(element) self.add_entry(entry) id = entry.attrib.get('id', entry._name) self.entries[id] = entry - + self.__dict__.update(self.attrib) keys = [make_key(key) for key in self.entries] entries = zip(keys, self.entries.values()) self.__dict__.update(entries) return self - diff --git a/unisens/utils.py b/unisens/utils.py index 79ebfe2..5348e73 100644 --- a/unisens/utils.py +++ b/unisens/utils.py @@ -7,25 +7,25 @@ @author: skjerns """ import re +import warnings from types import GeneratorType import numpy as np from collections import OrderedDict -try: profile #a hack for not having to remove the profile tags when not testing -except NameError: profile = lambda x: x # pass-through decorator # a helper function for anti-camel case first letter lowercase = lambda s: s[:1].lower() + s[1:] if s else '' # chars that are forbidden for identifiers of attributes -forbidden_identifiers = {x:95 for x in list(range(32,48))+ - list(range(58,65))+ - list(range(91,97))+ - list(range(123,999))} +forbidden_identifiers = {x: 95 for x in list(range(32, 48)) + + list(range(58, 65)) + + list(range(91, 97)) + + list(range(123, 999))} # lookup-table for forbidden chars for filenames. dict for speed. forbidden_for_filename = set(':*?"<>|') + def indent(elem, level=0): """ A helper function that indents XML tags automatically @@ -37,7 +37,7 @@ def indent(elem, level=0): level : int, optional Level of intendation. The default is 0. """ - + i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): @@ -45,13 +45,14 @@ def indent(elem, level=0): if not elem.tail or not elem.tail.strip(): elem.tail = i for elem in elem: - indent(elem, level+1) + indent(elem, level + 1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i + def num2str(element, decimal_sep='.'): """ A helper function that converts strings to numbers if possible @@ -73,16 +74,17 @@ def str2num(string, decimal_sep='.'): """ if not isinstance(string, str): return string if string.isdigit(): return int(string) - if string=='True': return True - if string=='False': return False - try: + if string == 'True': return True + if string == 'False': return False + try: string_x = string.translate({ord(decimal_sep): 46}) # necessary because of PEP-515, ignore _ in strings - string_x = string_x.translate({95:35}) + string_x = string_x.translate({95: 35}) return float(string_x) - except: return string - - + except: + return string + + def write_csv(csv_file, data_list, sep=';', decimal_sep='.', comment=None): """ Parameters @@ -106,29 +108,29 @@ def write_csv(csv_file, data_list, sep=';', decimal_sep='.', comment=None): """ # we accept data_lists or arrays - assert decimal_sep!=sep, 'Error, sep cannot be same as decimal_sep' + assert decimal_sep != sep, 'Error, sep cannot be same as decimal_sep' assert isinstance(data_list, (tuple, list, np.ndarray, GeneratorType)), \ - 'Must be list, tuple or array' + 'Must be list, tuple or array' if isinstance(data_list, np.ndarray): - if data_list.ndim==1: + if data_list.ndim == 1: data_list = [line for line in data_list] - elif data_list.ndim==2: + elif data_list.ndim == 2: data_list = [[x for x in d] for d in data_list] else: raise ValueError('Array must be 1D or 2D') # first add the comments if there are any - csv_string = '' + csv_string = '' if comment is not None: comment = comment.split('\n') csv_string += '# ' + '\n# '.join(comment) + '\n' - + # now go through the data list or array. for line in data_list: # if it contains several elements, we separate them with sep. # additionally we convert the decimal separator - if isinstance(line, (list, np.ndarray, tuple)): - csv_string += sep.join([num2str(e, decimal_sep)for e in line]) + if isinstance(line, (list, np.ndarray, tuple)): + csv_string += sep.join([num2str(e, decimal_sep) for e in line]) # if it's not a list, we just convert to string else: csv_string += num2str(line, decimal_sep) @@ -138,6 +140,7 @@ def write_csv(csv_file, data_list, sep=';', decimal_sep='.', comment=None): f.write(csv_string) return True + def read_csv(csv_file, comment='#', sep=';', decimal_sep='.', convert_nums=False, keep_empty=False): """ @@ -152,24 +155,24 @@ def read_csv(csv_file, comment='#', sep=';', decimal_sep='.', """ with open(csv_file, 'r') as f: content = f.read() - + # split in lines lines = content.split('\n') - + # ignore comments and remove whitespaces lines = [line.strip() for line in lines if not line.startswith(comment)] - + # remove empty lines if not keep_empty: - lines = [line for line in lines if (line!='' and line!=[])] - + lines = [line for line in lines if (line != '' and line != [])] + # split into subentries and strip of whitespaces lines = [[el.strip() for el in line.split(sep)] for line in lines] - + # remove empty last element if not keep_empty: for i, line in enumerate(lines): - if line[-1]=='': lines[i] = line[:-1] + if line[-1] == '': lines[i] = line[:-1] # convert to numbers if requested if convert_nums: @@ -187,8 +190,9 @@ class AttrDict(OrderedDict): def __init__(self, *args, **kwargs): super(AttrDict, self).__init__(*args, **kwargs) self.__dict__ = self - -def valid_filename(name:str): + + +def valid_filename(name: str): """ Checks whether a filename follows the naming conventions @@ -205,7 +209,7 @@ def valid_filename(name:str): if name.startswith('/') or name.startswith('\\'): # this gives an os.path error raise ValueError(f'ID cannot start with \\ or / is "{name}"') - + if not set(name).isdisjoint(forbidden_for_filename): raise ValueError('ID cannot contain :*?"<>|') return True @@ -216,14 +220,17 @@ def check1(name): if s in forbidden_for_filename: raise ValueError('ID cannot contain :*?"<>|') + def check2(name): return set(name).isdisjoint(forbidden_for_filename) + def check3(name): for s in forbidden_for_filename: if s in name: raise ValueError('ID cannot contain :*?"<>|') + def check4(name): name = set(name) for s in forbidden_for_filename: @@ -231,8 +238,7 @@ def check4(name): raise ValueError('ID cannot contain :*?"<>|') -# @profile -def make_key(string:str): +def make_key(string: str): """ A function that turns any string into a valid python variable string @@ -262,6 +268,7 @@ def validkey(key): raise ValueError('Key cannot start with a number: {}'.format(key)) return key + def strip(string): """ Strip a unisense identifier string of unnecessary elements @@ -269,4 +276,27 @@ def strip(string): """ if '}' in string: string = string.split('}')[-1] - return string \ No newline at end of file + return string + + +def infer_dtype(dataType: str) -> str: + """ + Mapping python / numpy data type to universal / java data type for compatibility + + :param dataType: str with numpy dtype or universal data type + :return: dataType: uppercase str of universal data type + """ + + dataType = dataType.upper() + dtype_mapping = {'FLOAT16': 'FLOAT', + 'FLOAT32': 'FLOAT', + 'FLOAT64': 'DOUBLE', + 'INT64': 'INT32', + 'INT': 'INT32'} + dataType = dtype_mapping.get(dataType, dataType) + allowed_dtypes = ['DOUBLE', 'FLOAT', 'INT16', 'INT32', + 'INT8', 'UINT16', 'UINT32', 'UINT8'] + assert dataType in allowed_dtypes, f'{dataType} is not in {allowed_dtypes}' + return dataType + +