Skip to content

Commit

Permalink
Merge branch 'acoustics_module' of https://github.com/jmcvey3/MHKiT-P…
Browse files Browse the repository at this point in the history
…ython into acoustics_module
  • Loading branch information
ssolson committed Sep 20, 2024
2 parents b4b9ac2 + d560041 commit a186779
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 143 deletions.
61 changes: 31 additions & 30 deletions mhkit/acoustics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@


def _fmax_warning(
fn: Union[int, float, np.ndarray], fmax: Union[int, float]
) -> Union[float, np.ndarray]:
fn: Union[int, float, np.ndarray], fmax: Union[int, float, np.ndarray]
) -> Union[int, float, np.ndarray]:
"""
Checks that the maximum frequency limit isn't greater than the Nyquist frequency.
Expand All @@ -35,7 +35,7 @@ def _fmax_warning(

if not isinstance(fn, (int, float, np.ndarray)):
raise TypeError("'fn' must be a numeric type (int or float).")
if not isinstance(fmax, (int, float)):
if not isinstance(fmax, (int, float, np.ndarray)):
raise TypeError("'fmax' must be a numeric type (int or float).")

if fmax > fn:
Expand Down Expand Up @@ -97,13 +97,13 @@ def minimum_frequency(
if c_seabed <= c:
raise ValueError("'c_seabed' must be greater than 'c'.")

f_min = c / (4 * water_depth * np.sqrt(1 - (c / c_seabed) ** 2))
fmin = c / (4 * water_depth * np.sqrt(1 - (c / c_seabed) ** 2))

return f_min
return fmin


def sound_pressure_spectral_density(
pressure: xr.DataArray, fs: int, window: Union[int, float] = 1
pressure: xr.DataArray, fs: Union[int, float], window: Union[int, float] = 1
) -> xr.DataArray:
"""
Calculates the mean square sound pressure spectral density from audio
Expand All @@ -114,7 +114,7 @@ def sound_pressure_spectral_density(
Parameters
----------
pressure: xarray.DataArray (time)
Sound pressure in [Pa] or volts [V]
Sound pressure in [Pa] or voltage [V]
fs: int
Data collection sampling rate [Hz]
window: string (optional)
Expand All @@ -127,8 +127,8 @@ def sound_pressure_spectral_density(
"""
if not isinstance(pressure, xr.DataArray):
raise TypeError("'pressure' must be an xarray.DataArray.")
if not isinstance(fs, int):
raise TypeError("'fs' must be an integer.")
if not isinstance(fs, (int, float)):
raise TypeError("'fs' must be a numeric type (int or float).")
if not isinstance(window, (int, float)):
raise TypeError("'window' must be a numeric type (int or float).")

Expand Down Expand Up @@ -169,7 +169,9 @@ def sound_pressure_spectral_density(


def apply_calibration(
spsd: xr.DataArray, sensitivity_curve: xr.DataArray, fill_value: Union[float, int]
spsd: xr.DataArray,
sensitivity_curve: xr.DataArray,
fill_value: Union[float, int, np.ndarray],
) -> xr.DataArray:
"""
Applies custom calibration to spectral density values.
Expand All @@ -180,6 +182,7 @@ def apply_calibration(
Mean square sound pressure spectral density in V^2/Hz.
sensitivity_curve: xarray.DataArray (freq)
Calibrated sensitivity curve in units of dB rel 1 V^2/uPa^2.
First column should be frequency, second column should be calibration values.
fill_value: float or int
Value with which to fill missing values from the calibration curve,
in units of dB rel 1 V^2/uPa^2.
Expand All @@ -194,7 +197,7 @@ def apply_calibration(
raise TypeError("'spsd' must be an xarray.DataArray.")
if not isinstance(sensitivity_curve, xr.DataArray):
raise TypeError("'sensitivity_curve' must be an xarray.DataArray.")
if not isinstance(fill_value, (int, float)):
if not isinstance(fill_value, (int, float, np.ndarray)):
raise TypeError("'fill_value' must be a numeric type (int or float).")

# Ensure 'freq' dimension exists in 'spsd'
Expand Down Expand Up @@ -229,7 +232,7 @@ def apply_calibration(
# Interpolate calibration curve to desired value
calibration = sensitivity_curve.interp(
{freq: spsd_calibrated["freq"]}, method="linear"
).drop(freq)
).drop_vars(freq)
# Fill missing with provided value
calibration = calibration.fillna(fill_value)

Expand Down Expand Up @@ -395,20 +398,18 @@ def band_average(
bandwidth = 2 ** (1 / octave)
half_bandwidth = 2 ** (1 / (octave * 2))

frequencies = {}
frequencies["center_freq"] = 10 ** np.arange(
band = {}
band["center_freq"] = 10 ** np.arange(
np.log10(fmin),
np.log10(fmax * bandwidth),
step=np.log10(bandwidth),
)
frequencies["lower_limit"] = frequencies["center_freq"] / half_bandwidth
frequencies["upper_limit"] = frequencies["center_freq"] * half_bandwidth
octave_bins = np.append(frequencies["lower_limit"], frequencies["upper_limit"][-1])
band["lower_limit"] = band["center_freq"] / half_bandwidth
band["upper_limit"] = band["center_freq"] * half_bandwidth
octave_bins = np.append(band["lower_limit"], band["upper_limit"][-1])

# Use xarray binning methods
spsdl_group = spsdl.groupby_bins(
"freq", octave_bins, labels=frequencies["center_freq"]
)
spsdl_group = spsdl.groupby_bins("freq", octave_bins, labels=band["center_freq"])

# Handle method being a string or a dict
if isinstance(method, str):
Expand All @@ -420,8 +421,8 @@ def band_average(
out = func(method_arg)
else:
raise ValueError(
f"Unsupported method type: {type(method)}. \
Must be a string or dictionary."
f"Unsupported method type: {type(method)}. "
"Must be a string or dictionary."
)

out.attrs.update(
Expand All @@ -445,7 +446,7 @@ def time_average(
Parameters
----------
spsdl: xarray.DataArray (time)
spsdl: xarray.DataArray (time, freq)
Mean square sound pressure spectral density level in dB rel 1 uPa^2/Hz
window: int
Time in seconds to subdivide spectral density level into. Default: 60 s.
Expand Down Expand Up @@ -652,22 +653,22 @@ def _band_sound_pressure_level(
# Reference value of sound pressure
reference = 1e-12 # Pa^2, = 1 uPa^2

frequencies = {}
frequencies["center_freq"] = 10 ** np.arange(
band = {}
band["center_freq"] = 10 ** np.arange(
np.log10(fmin),
np.log10(fmax * bandwidth),
step=np.log10(bandwidth),
)
frequencies["lower_limit"] = frequencies["center_freq"] / half_bandwidth
frequencies["upper_limit"] = frequencies["center_freq"] * half_bandwidth
octave_bins = np.append(frequencies["lower_limit"], frequencies["upper_limit"][-1])
band["lower_limit"] = band["center_freq"] / half_bandwidth
band["upper_limit"] = band["center_freq"] * half_bandwidth
octave_bins = np.append(band["lower_limit"], band["upper_limit"][-1])

# Manual trapezoidal rule to get Pa^2
pressure_squared = xr.DataArray(
coords={"time": spsd["time"], "freq_bins": frequencies["center_freq"]},
coords={"time": spsd["time"], "freq_bins": band["center_freq"]},
dims=["time", "freq_bins"],
)
for i, key in enumerate(frequencies["center_freq"]):
for i, key in enumerate(band["center_freq"]):
band_min = octave_bins[i]
band_max = octave_bins[i + 1]
pressure_squared.loc[{"freq_bins": key}] = np.trapz(
Expand Down
Loading

0 comments on commit a186779

Please sign in to comment.