Source code for dolfyn.tools.psd

import numpy as np
from .misc import detrend
fft = np.fft.fft


[docs] def psd_freq(nfft, fs, full=False): """Compute the frequency for vector for a `nfft` and `fs`. Parameters ---------- fs : float The sampling frequency (e.g. samples/sec) nfft : int The number of samples in a window. full : bool (default: False) Whether to return half frequencies (positive), or the full frequencies. Returns ------- freq : |np.ndarray| The frequency vector, in same units as 'fs' """ fs = np.float64(fs) f = np.fft.fftfreq(int(nfft), 1 / fs) if full: return f else: return np.abs(f[1:int(nfft / 2. + 1)])
def _getwindow(window, nfft): if window == 'hann': window = np.hanning(nfft) elif window == 'hamm': window = np.hamming(nfft) elif window is None or window == 1: window = np.ones(nfft) return window
[docs] def stepsize(l, nfft, nens=None, step=None): """Calculates the fft-step size for a length *l* array. If nens is None, the step size is chosen to maximize data use, minimize nens and have a minimum of 50% overlap. If nens is specified, the step-size is computed directly. Parameters ---------- l : The length of the array. nfft : The number of points in the fft. nens : The number of nens to perform (default compute this). Returns ------- step : The step size. nens : The number of ensemble ffts to average together. nfft : The number of points in the fft (set to l if nfft>l). """ if l < nfft: nfft = l if nens is None and step is None: if l == nfft: return 0, 1, int(nfft) nens = int(2. * l / nfft) return int((l - nfft) / (nens - 1)), nens, int(nfft) elif nens is None: return int(step), int((l - nfft) / step + 1), int(nfft) else: if nens == 1: return 0, 1, int(nfft) return int((l - nfft) / (nens - 1)), int(nens), int(nfft)
[docs] def coherence(a, b, nfft, window='hann', debias=True, noise=(0, 0)): """Computes the magnitude-squared coherence of `a` and `b`. Parameters ---------- a : |np.ndarray| The first array over which to compute coherence. b : |np.ndarray| The second array over which to compute coherence. nfft : int The number of points to use in the fft. window : string, np.ndarray (default 'hann') The window to use for ffts. debias : bool (default: True) Specify whether to debias the signal according to Benignus1969. noise : tuple(2), or float The `noise` keyword may be used to specify the signals' noise levels (std of noise in a,b). If `noise` is a two element tuple or list, the first and second elements specify the noise levels of a and b, respectively. default: noise=(0,0) Returns ------- out : |np.ndarray| Coherence between `a` and `b` Notes ----- Coherence is defined as: .. math:: C_{ab} = \\frac{|S_{ab}|^2}{S_{aa} * S_{bb}} Here :math:`S_{ab}`, :math:`S_{aa}` and :math:`S_{bb}` are the cross, and auto spectral densities of the signal `a` and `b`. """ l = [len(a), len(b)] cross = cpsd_quasisync if l[0] == l[1]: cross = cpsd elif l[0] > l[1]: a, b = b, a l = l[::-1] step1, nens, nfft = stepsize(l[0], nfft) step2, nens, nfft = stepsize(l[1], nfft, nens=nens) if noise.__class__ not in [list, tuple, np.ndarray]: noise = [noise, noise] elif len(noise) == 1: noise = [noise[0], noise[0]] if nens <= 2: raise Exception("Coherence must be computed from a set of ensembles.") # fs=1 is ok because it comes out in the normalization. (noise # normalization depends on this) out = ((np.abs(cross(a, b, nfft, 1, window=window)) ** 2) / ((psd(a, nfft, 1, window=window, step=step1) - noise[0] ** 2 / np.pi) * (psd(b, nfft, 1, window=window, step=step2) - noise[1] ** 2 / np.pi)) ) if debias: # This is from Benignus1969, it seems to work (make data with different # nens (nfft) agree). return out * (1 + 1. / nens) - 1. / nens return out
[docs] def cpsd_quasisync(a, b, nfft, fs, window='hann'): """Compute the cross power spectral density (CPSD) of the signals `a` and `b`. Parameters ---------- a : |np.ndarray| The first signal. b : |np.ndarray| The second signal. nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', |np.ndarray|} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. Returns ------- cpsd : |np.ndarray| The cross-spectral density of `a` and `b`. See Also --------- :func:`psd`, :func:`coherence`, :func:`cpsd`, numpy.fft Notes ----- `a` and `b` do not need to be 'tightly' synchronized, and can even be different lengths, but the first- and last-index of both series should be synchronized (to whatever degree you want unbiased phases). This performs: .. math:: fft(a)*conj(fft(b)) Note that this is consistent with :func:`numpy.correlate`. It detrends the data and uses a minimum of 50% overlap for the shorter of `a` and `b`. For the longer, the overlap depends on the difference in size. 1-(l_short/l_long) data will be underutilized (where l_short and l_long are the length of the shorter and longer series, respectively). The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. """ if np.iscomplexobj(a) or np.iscomplexobj(b): raise Exception("Velocity cannot be complex") l = [len(a), len(b)] if l[0] == l[1]: return cpsd(a, b, nfft, fs, window=window) elif l[0] > l[1]: a, b = b, a l = l[::-1] step = [0, 0] step[0], nens, nfft = stepsize(l[0], nfft) step[1], nens, nfft = stepsize(l[1], nfft, nens=nens) fs = np.float64(fs) window = _getwindow(window, nfft) fft_inds = slice(1, int(nfft / 2. + 1)) wght = 2. / (window ** 2).sum() pwr = fft(detrend(a[0:nfft]) * window)[fft_inds] * \ np.conj(fft(detrend(b[0:nfft]) * window)[fft_inds]) if nens - 1: for i1, i2 in zip(range(step[0], l[0] - nfft + 1, step[0]), range(step[1], l[1] - nfft + 1, step[1])): pwr += fft(detrend(a[i1:(i1 + nfft)]) * window)[fft_inds] * \ np.conj(fft(detrend(b[i2:(i2 + nfft)]) * window)[fft_inds]) pwr *= wght / nens / fs return pwr
[docs] def cpsd(a, b, nfft, fs, window='hann', step=None): """Compute the cross power spectral density (CPSD) of the signals `a` and `b`. Parameters ---------- a : |np.ndarray| The first signal. b : |np.ndarray| The second signal. nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', |np.ndarray|} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. step : int Use this to specify the overlap. For example: - step : nfft/2 specifies a 50% overlap. - step : nfft specifies no overlap. - step=2*nfft means that half the data will be skipped. By default, `step` is calculated to maximize data use, have at least 50% overlap and minimize the number of ensembles. Returns ------- cpsd : |np.ndarray| The cross-spectral density of `a` and `b`. See also -------- :func:`psd` :func:`coherence` `numpy.fft` Notes ----- cpsd removes a linear trend from the signals. The two signals should be the same length, and should both be real. This performs: .. math:: fft(a)*conj(fft(b)) This implementation is consistent with the numpy.correlate definition of correlation. (The conjugate of D.B. Chelton's definition of correlation.) The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. """ if np.iscomplexobj(a) or np.iscomplexobj(b): raise Exception("Velocity cannot be complex") auto_psd = False if a is b: auto_psd = True l = len(a) step, nens, nfft = stepsize(l, nfft, step=step) fs = np.float64(fs) window = _getwindow(window, nfft) fft_inds = slice(1, int(nfft / 2. + 1)) wght = 2. / (window ** 2).sum() s1 = fft(detrend(a[0:nfft]) * window)[fft_inds] if auto_psd: pwr = np.abs(s1) ** 2 else: pwr = s1 * np.conj(fft(detrend(b[0:nfft]) * window)[fft_inds]) if nens - 1: for i in range(step, l - nfft + 1, step): s1 = fft(detrend(a[i:(i + nfft)]) * window)[fft_inds] if auto_psd: pwr += np.abs(s1) ** 2 else: pwr += s1 * \ np.conj(fft(detrend(b[i:(i + nfft)]) * window)[fft_inds]) pwr *= wght / nens / fs return pwr
[docs] def psd(a, nfft, fs, window='hann', step=None): """Compute the power spectral density (PSD). This function computes the one-dimensional `n`-point PSD. The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. Parameters ---------- a : |np.ndarray| The first signal, as a 1D vector nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', |np.ndarray|} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. step : int Use this to specify the overlap. For example: - step : nfft/2 specifies a 50% overlap. - step : nfft specifies no overlap. - step=2*nfft means that half the data will be skipped. By default, `step` is calculated to maximize data use, have at least 50% overlap and minimize the number of ensembles. Returns ------- psd : |np.ndarray| The power spectral density of `a` and `b`. Notes ----- Credit: This function's line of code was copied from JN's fast_psd.m routine. See Also -------- :func:`cpsd` :func:`coherence` `numpy.fft` """ return np.abs(cpsd(a, a, nfft, fs, window=window, step=step))
[docs] def phase_angle(a, b, nfft, window='hann', step=None): """Compute the phase difference between signals `a` and `b`. This is the complimentary function to coherence and cpsd. Positive angles means that `b` leads `a`, i.e. this does, essentially: angle(b) - angle(a) This function computes one-dimensional `n`-point PSD. The angles are output as magnitude = 1 complex numbers (to simplify averaging). Therefore, use `numpy.angle` to actually output the angle. Parameters ---------- a : 1d-array_like, the signal. Currently only supports vectors. nfft : The number of points in the fft. window : The window to use (default: 'hann'). Valid entries are: None,1 : uses a 'boxcar' or ones window. 'hann' : hanning window. a length(nfft) array : use this as the window directly. step : Use this to specify the overlap. For example: - step=nfft/2 specifies a 50% overlap. - step=nfft specifies no overlap. - step=2*nfft means that half the data will be skipped. By default, `step` is calculated to maximize data use, have at least 50% overlap and minimize the number of ensembles. Returns ------- ang : complex |np.ndarray| (unit magnitude values) See Also -------- `numpy.fft` :func:`coherence` :func:`cpsd` """ window = _getwindow(window, nfft) fft_inds = slice(1, int(nfft / 2. + 1)) s1 = fft(detrend(a[0:nfft]) * window)[fft_inds] s2 = fft(detrend(b[0:nfft]) * window)[fft_inds] s1 /= np.abs(s1) s2 /= np.abs(s2) ang = s2 / s1 l = len(a) step, nens, nfft = stepsize(l, nfft, step=step) if nens - 1: for i in range(step, l - nfft + 1, step): s1 = fft(detrend(a[i:(i + nfft)]) * window)[fft_inds] s1 /= np.abs(s1) s2 = fft(detrend(a[i:(i + nfft)]) * window)[fft_inds] s2 /= np.abs(s2) ang += s2 / s1 ang /= nens return ang