import numpy as np
from ..velocity import VelBinner
import warnings
from ..tools.misc import slice1d_along_axis, _nans_like
from scipy.special import cbrt
import xarray as xr
[docs]
class ADVBinner(VelBinner):
"""A class that builds upon `VelBinner` for calculating turbulence
statistics and velocity spectra from ADV data
Parameters
----------
n_bin : int
The length of each `bin`, in number of points, for this averaging
operator.
fs : int
Instrument sampling frequency in Hz
n_fft : int (optional, default: n_fft = n_bin)
The length of the FFT for computing spectra (must be <= n_bin)
n_fft_coh : int (optional, default: `n_fft_coh`=`n_fft`)
Number of data points to use for coherence and cross-spectra ffts
noise : float, list or numpy.ndarray
Instrument's doppler noise in same units as velocity
"""
[docs]
def __call__(self, ds, freq_units='rad/s', window='hann'):
out = type(ds)()
out = self.do_avg(ds, out)
noise = ds.get('doppler_noise', [0, 0, 0])
out['tke_vec'] = self.calc_tke(ds['vel'], noise=noise)
out['stress_vec'] = self.calc_stress(ds['vel'])
out['psd'] = self.calc_psd(ds['vel'],
window=window,
freq_units=freq_units,
noise=noise)
for key in list(ds.attrs.keys()):
if 'config' in key:
ds.attrs.pop(key)
out.attrs = ds.attrs
out.attrs['n_bin'] = self.n_bin
out.attrs['n_fft'] = self.n_fft
out.attrs['n_fft_coh'] = self.n_fft_coh
return out
[docs]
def calc_stress(self, veldat, detrend=True):
"""
Calculate the stresses (covariances of u,v,w)
Parameters
----------
veldat : xr.DataArray
Velocity data array from ADV data. The last dimension is
assumed to be time.
detrend : bool (default: True)
detrend the velocity data (True), or simply de-mean it
(False), prior to computing stress. Note: the psd routines
use detrend, so if you want to have the same amount of
variance here as there use ``detrend=True``.
Returns
-------
out : xarray.DataArray
"""
time = self.mean(veldat.time.values)
vel = veldat.values
out = np.empty(self._outshape(vel.shape)[:-1],
dtype=np.float32)
if detrend:
vel = self.detrend(vel)
else:
vel = self.demean(vel)
for idx, p in enumerate(self._cross_pairs):
out[idx] = np.nanmean(vel[p[0]] * vel[p[1]],
-1, dtype=np.float64
).astype(np.float32)
da = xr.DataArray(out.astype('float32'),
dims=veldat.dims,
attrs={'units': 'm2 s-2',
'long_name': 'Reynolds Stress Vector',
'standard_name': 'specific_reynolds_stress_of_sea_water'})
da = da.rename({'dir': 'tau'})
da = da.assign_coords({'tau': self.tau, 'time': time})
return da
[docs]
def calc_csd(self, veldat,
freq_units='rad/s',
fs=None,
window='hann',
n_bin=None,
n_fft_coh=None):
"""Calculate the cross-spectral density of velocity components.
Parameters
----------
veldat : xarray.DataArray
The raw 3D velocity data.
freq_units : string
Frequency units of the returned spectra in either Hz or rad/s
(`f` or :math:`\\omega`)
fs : float (optional)
The sample rate (default: from the binner).
window : string or array
Specify the window function.
Options: 1, None, 'hann', 'hamm'
n_bin : int (optional)
The bin-size (default: from the binner).
n_fft_coh : int (optional)
The fft size (default: n_fft_coh from the binner).
Returns
-------
csd : xarray.DataArray (3, M, N_FFT)
The first-dimension of the cross-spectrum is the three
different cross-spectra: 'uv', 'uw', 'vw'.
"""
fs_in = self._parse_fs(fs)
n_fft = self._parse_nfft_coh(n_fft_coh)
time = self.mean(veldat.time.values)
veldat = veldat.values
if len(np.shape(veldat)) != 2:
raise Exception("This function is only valid for calculating TKE using "
"the 3D velocity vector from an ADV.")
out = np.empty(self._outshape_fft(veldat[:3].shape, n_fft=n_fft, n_bin=n_bin),
dtype='complex')
# Create frequency vector, also checks whether using f or omega
if 'rad' in freq_units:
fs = 2*np.pi*fs_in
freq_units = 'rad s-1'
units = 'm2 s-1 rad-1'
else:
fs = fs_in
freq_units = 'Hz'
units = 'm2 s-2 Hz-1'
coh_freq = xr.DataArray(self.calc_freq(fs=fs_in, units=freq_units, n_fft=n_fft, coh=True),
dims=['coh_freq'],
name='coh_freq',
attrs={'units': freq_units,
'long_name': 'FFT Frequency Vector',
'coverage_content_type': 'coordinate'}
).astype('float32')
for ip, ipair in enumerate(self._cross_pairs):
out[ip] = self.calc_csd_base(veldat[ipair[0]],
veldat[ipair[1]],
fs=fs,
window=window,
n_bin=n_bin,
n_fft=n_fft)
csd = xr.DataArray(out.astype('complex64'),
coords={'C': self.C,
'time': time,
'coh_freq': coh_freq},
dims=['C', 'time', 'coh_freq'],
attrs={'units': units,
'n_fft_coh': n_fft,
'long_name': 'Cross Spectral Density',
'standard_name': 'cross_spectral_density_of_sea_water_velocity'})
csd['coh_freq'].attrs['units'] = freq_units
return csd
[docs]
def calc_doppler_noise(self, psd, pct_fN=0.8):
"""Calculate bias due to Doppler noise using the noise floor
of the velocity spectra.
Parameters
----------
psd : xarray.DataArray (dir, time, f)
The ADV velocity spectra
pct_fN : float
Percent of Nyquist frequency to calculate characeristic frequency
Returns
-------
doppler_noise (xarray.DataArray):
Doppler noise level in units of m/s
Notes
-----
Approximates bias from
.. :math: \\sigma^{2}_{noise} = N x f_{c}
where :math: `\\sigma_{noise}` is the bias due to Doppler noise,
`N` is the constant variance or spectral density, and `f_{c}`
is the characteristic frequency.
The characteristic frequency is then found as
.. :math: f_{c} = pct_fN * (f_{s}/2)
where `f_{s}/2` is the Nyquist frequency.
Richard, Jean-Baptiste, et al. "Method for identification of Doppler noise
levels in turbulent flow measurements dedicated to tidal energy." International
Journal of Marine Energy 3 (2013): 52-64.
ThiƩbaut, Maxime, et al. "Investigating the flow dynamics and turbulence at a
tidal-stream energy site in a highly energetic estuary." Renewable Energy 195
(2022): 252-262.
"""
# Characteristic frequency set to 80% of Nyquist frequency
fN = self.fs/2
fc = pct_fN * fN
# Get units right
if psd.freq.units == "Hz":
f_range = slice(fc, fN)
else:
f_range = slice(2*np.pi*fc, 2*np.pi*fN)
# Noise floor
N2 = psd.sel(freq=f_range) * psd.freq.sel(freq=f_range)
noise_level = np.sqrt(N2.mean(dim='freq'))
return xr.DataArray(
noise_level.values.astype('float32'),
dims=['dir', 'time'],
attrs={'units': 'm/s',
'long_name': 'Doppler Noise Level',
'description': 'Doppler noise level calculated '
'from PSD white noise'})
[docs]
def check_turbulence_cascade_slope(self, psd, freq_range=[6.28, 12.57]):
"""This function calculates the slope of the PSD, the power spectra
of velocity, within the given frequency range. The purpose of this
function is to check that the region of the PSD containing the
isotropic turbulence cascade decreases at a rate of :math:`f^{-5/3}`.
Parameters
----------
psd : xarray.DataArray ([time,] freq)
The power spectral density (1D or 2D)
freq_range : iterable(2) (default: [6.28, 12.57])
The range over which the isotropic turbulence cascade occurs, in
units of the psd frequency vector (Hz or rad/s)
Returns
-------
(m, b): tuple (slope, y-intercept)
A tuple containing the coefficients of the log-adjusted linear
regression between PSD and frequency
Notes
-----
Calculates slope based on the `standard` formula for dissipation:
.. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N
The slope of the isotropic turbulence cascade, which should be
equal to :math:`k^{-5/3}` or :math:`f^{-5/3}`, where k and f are
the wavenumber and frequency vectors, is estimated using linear
regression with a log transformation:
.. math:: log10(y) = m*log10(x) + b
Which is equivalent to
.. math:: y = 10^{b} x^{m}
Where :math:`y` is S(k) or S(f), :math:`x` is k or f, :math:`m`
is the slope (ideally -5/3), and :math:`10^{b}` is the intercept of
y at x^m=1.
"""
idx = np.where((freq_range[0] < psd.freq) & (psd.freq < freq_range[1]))
idx = idx[0]
x = np.log10(psd['freq'].isel(freq=idx))
y = np.log10(psd.isel(freq=idx))
y_bar = y.mean('freq')
x_bar = x.mean('freq')
# using the formula to calculate the slope and intercept
n = np.sum((x - x_bar) * (y - y_bar), axis=0)
d = np.sum((x - x_bar)**2, axis=0)
m = n/d
b = y_bar - m*x_bar
return m, b
[docs]
def calc_epsilon_LT83(self, psd, U_mag, freq_range=[6.28, 12.57]):
"""Calculate the dissipation rate from the PSD
Parameters
----------
psd : xarray.DataArray (...,time,freq)
The power spectral density
U_mag : xarray.DataArray (...,time)
The bin-averaged horizontal velocity [m s-1] (from dataset shortcut)
freq_range : iterable(2) (default: [6.28, 12.57])
The range over which to integrate/average the spectrum, in units
of the psd frequency vector (Hz or rad/s)
Returns
-------
epsilon : xarray.DataArray (...,n_time)
dataArray of the dissipation rate
Notes
-----
This uses the `standard` formula for dissipation:
.. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N
where :math:`\\alpha = 0.5` (1.5 for all three velocity
components), `k` is wavenumber, `S(k)` is the turbulent
kinetic energy spectrum, and `N' is the doppler noise level
associated with the TKE spectrum.
With :math:`k \\rightarrow \\omega / U`, then -- to preserve variance --
:math:`S(k) = U S(\\omega)`, and so this becomes:
.. math:: S(\\omega) = \\alpha \\epsilon^{2/3} \\omega^{-5/3} U^{2/3} + N
With :math:`k \\rightarrow (2\\pi f) / U`, then
.. math:: S(\\omega) = \\alpha \\epsilon^{2/3} f^{-5/3} (U/(2\\pi))^{2/3} + N
LT83 : Lumley and Terray, "Kinematics of turbulence convected
by a random wave field". JPO, 1983, vol13, pp2000-2007.
"""
# Ensure time has been averaged
if len(psd.time)!=len(U_mag.time):
raise Exception("`U_mag` should be from ensembled-averaged dataset")
freq = psd.freq
idx = np.where((freq_range[0] < freq) & (freq < freq_range[1]))
idx = idx[0]
if freq.units == 'Hz':
U = U_mag/(2*np.pi)
else:
U = U_mag
a = 0.5
out = (psd.isel(freq=idx) *
freq.isel(freq=idx)**(5/3) / a).mean(axis=-1)**(3/2) / U
return xr.DataArray(
out.astype('float32'),
attrs={'units': 'm2 s-3',
'long_name': 'Dissipation Rate',
'standard_name': 'specific_turbulent_kinetic_energy_dissipation_in_sea_water',
'description': 'TKE dissipation rate calculated using the method from Lumley and Terray, 1983',
})
[docs]
def calc_epsilon_SF(self, vel_raw, U_mag, fs=None, freq_range=[2., 4.]):
"""Calculate dissipation rate using the "structure function" (SF) method
Parameters
----------
vel_raw : xarray.DataArray
The raw velocity data (1D dimension time) upon
which to perform the SF technique.
U_mag : xarray.DataArray
The bin-averaged horizontal velocity (from dataset shortcut)
fs : float
The sample rate of `vel_raw` [Hz]
freq_range : iterable(2) (default: [2., 4.])
The frequency range over which to compute the SF [Hz]
(i.e. the frequency range within which the isotropic
turbulence cascade falls)
Returns
-------
epsilon : xarray.DataArray
dataArray of the dissipation rate
"""
veldat = vel_raw.values
if len(veldat.shape) > 1:
raise Exception("Function input should be a 1D velocity vector")
fs = self._parse_fs(fs)
if freq_range[1] > fs:
warnings.warn('Max freq_range cannot be greater than fs')
dt = self.reshape(veldat)
out = np.empty(dt.shape[:-1], dtype=dt.dtype)
for slc in slice1d_along_axis(dt.shape, -1):
up = dt[slc]
lag = U_mag.values[slc[:-1]] / fs * np.arange(up.shape[0])
DAA = _nans_like(lag)
for L in range(int(fs / freq_range[1]), int(fs / freq_range[0])):
DAA[L] = np.nanmean((up[L:] - up[:-L]) ** 2, dtype=np.float64)
cv2 = DAA / (lag ** (2 / 3))
cv2m = np.median(cv2[np.logical_not(np.isnan(cv2))])
out[slc[:-1]] = (cv2m / 2.1) ** (3 / 2)
return xr.DataArray(
out.astype('float32'),
coords=U_mag.coords,
dims=U_mag.dims,
attrs={'units': 'm2 s-3',
'long_name': 'Dissipation Rate',
'standard_name': 'specific_turbulent_kinetic_energy_dissipation_in_sea_water',
'description': 'TKE dissipation rate calculated using the "structure function" method',
})
def _up_angle(self, U_complex):
"""Calculate the angle of the turbulence fluctuations.
Parameters
----------
U_complex : |np.ndarray| (..., n_time * n_bin)
The complex, raw horizontal velocity (non-binned)
Returns
-------
theta : |np.ndarray| (..., n_time)
The angle of the turbulence [rad]
"""
dt = self.demean(U_complex)
fx = dt.imag <= 0
dt[fx] = dt[fx] * np.exp(1j * np.pi)
return np.angle(np.mean(dt, -1, dtype=np.complex128))
def _calc_epsTE01_int(self, I_tke, theta):
"""The integral, equation A13, in [TE01].
Parameters
----------
I_tke : |np.ndarray|
(beta in TE01) is the turbulence intensity ratio:
\\sigma_u / V
theta : |np.ndarray|
is the angle between the mean flow and the primary axis of
velocity fluctuations
"""
x = np.arange(-20, 20, 1e-2) # I think this is a long enough range.
out = np.empty_like(I_tke.flatten())
for i, (b, t) in enumerate(zip(I_tke.flatten(), theta.flatten())):
out[i] = np.trapz(
cbrt(x**2 - 2/b*np.cos(t)*x + b**(-2)) *
np.exp(-0.5 * x ** 2), x)
return out.reshape(I_tke.shape) * \
(2 * np.pi) ** (-0.5) * I_tke ** (2 / 3)
[docs]
def calc_epsilon_TE01(self, dat_raw, dat_avg, freq_range=[6.28, 12.57]):
"""Calculate the dissipation rate according to TE01.
Parameters
----------
dat_raw : xarray.Dataset
The raw (off the instrument) adv dataset
dat_avg : xarray.Dataset
The bin-averaged adv dataset (calc'd from 'calc_turbulence' or
'do_avg'). The spectra (psd) and basic turbulence statistics
('tke_vec' and 'stress_vec') must already be computed.
freq_range : iterable(2) (default: [6.28, 12.57])
The range over which to integrate/average the spectrum, in units
of the psd frequency vector (Hz or rad/s)
Notes
-----
TE01 : Trowbridge, J and Elgar, S, "Turbulence measurements in
the Surf Zone". JPO, 2001, vol31, pp2403-2417.
"""
# Assign local names
U_mag = dat_avg.velds.U_mag.values
I_tke = dat_avg.velds.I_tke.values
theta = np.angle(dat_avg.velds.U.values) - \
self._up_angle(dat_raw.velds.U.values)
freq = dat_avg['psd'].freq.values
# Calculate constants
alpha = 1.5
intgrl = self._calc_epsTE01_int(I_tke, theta)
# Index data to be used
inds = (freq_range[0] < freq) & (freq < freq_range[1])
psd = dat_avg.psd[..., inds].values
freq = freq[inds].reshape([1] * (dat_avg.psd.ndim - 2) + [sum(inds)])
# Estimate values
# u & v components (equation 6)
out = (np.nanmean((psd[0] + psd[1]) * freq**(5/3), -1) /
(21/55 * alpha * intgrl))**(3/2) / U_mag
# Add w component
out += (np.nanmean(psd[2] * freq**(5/3), -1) /
(12/55 * alpha * intgrl))**(3/2) / U_mag
# Average the two estimates
out *= 0.5
return xr.DataArray(
out.astype('float32'),
coords={'time': dat_avg.psd.time},
dims='time',
attrs={'units': 'm2 s-3',
'long_name': 'Dissipation Rate',
'standard_name': 'specific_turbulent_kinetic_energy_dissipation_in_sea_water',
'description': 'TKE dissipation rate calculated using the method from Trowbridge and Elgar, 2001'
})
[docs]
def calc_L_int(self, a_cov, U_mag, fs=None):
"""Calculate integral length scales.
Parameters
----------
a_cov : xarray.DataArray
The auto-covariance array (i.e. computed using `calc_acov`).
U_mag : xarray.DataArray
The bin-averaged horizontal velocity (from dataset shortcut)
fs : float
The raw sample rate
Returns
-------
L_int : |np.ndarray| (..., n_time)
The integral length scale (T_int*U_mag).
Notes
----
The integral time scale (T_int) is the lag-time at which the
auto-covariance falls to 1/e.
If T_int is not reached, L_int will default to '0'.
"""
acov = a_cov.values
fs = self._parse_fs(fs)
scale = np.argmin((acov/acov[..., :1]) > (1/np.e), axis=-1)
L_int = U_mag.values / fs * scale
return xr.DataArray(
L_int.astype('float32'),
coords={'dir': a_cov.dir, 'time': a_cov.time},
attrs={'units': 'm',
'long_name': 'Integral Length Scale',
'standard_name': 'turbulent_mixing_length_of_sea_water'})
[docs]
def calc_turbulence(ds_raw, n_bin, fs, n_fft=None, freq_units='rad/s', window='hann'):
"""Functional version of `ADVBinner` that computes a suite of turbulence
statistics for the input dataset, and returns a `binned` data object.
Parameters
----------
ds_raw : xarray.Dataset
The raw adv datset to `bin`, average and compute
turbulence statistics of.
freq_units : string (default: rad/s)
Frequency units of the returned spectra in either Hz or rad/s
(`f` or :math:`\\omega`)
window : 1, None, 'hann', 'hamm'
The window to use for calculating power spectral densities
Returns
-------
ds : xarray.Dataset
Returns an 'binned' (i.e. 'averaged') data object. All
fields (variables) of the input data object are averaged in n_bin
chunks. This object also computes the following items over
those chunks:
- tke_vec : The energy in each component, each components is
alternatively accessible as:
:attr:`upup_ <dolfyn.velocity.Velocity.upup_>`,
:attr:`vpvp_ <dolfyn.velocity.Velocity.vpvp_>`,
:attr:`wpwp_ <dolfyn.velocity.Velocity.wpwp_>`)
- stress_vec : The Reynolds stresses, each component is
alternatively accessible as:
:attr:`upwp_ <dolfyn.data.velocity.Velocity.upwp_>`,
:attr:`vpwp_ <dolfyn.data.velocity.Velocity.vpwp_>`,
:attr:`upvp_ <dolfyn.data.velocity.Velocity.upvp_>`)
- U_std : The standard deviation of the horizontal
velocity `U_mag`.
- psd : DataArray containing the spectra of the velocity
in radial frequency units. The data-array contains:
- vel : the velocity spectra array (m^2/s/rad))
- omega : the radial frequncy (rad/s)
"""
calculator = ADVBinner(n_bin, fs, n_fft=n_fft)
return calculator(ds_raw, freq_units=freq_units, window=window)