Source code for pylablib.core.dataproc.fourier
"""
Routines for Fourier transform.
"""
from .table_wrap import wrap
from . import utils, specfunc
import numpy as np
import numpy.fft as fft
_prev_len_cache={}
_default_maxprime=7
[docs]
def get_prev_len(l, maxprime=_default_maxprime):
"""
Get the largest number less or equal to `l`, which is composed of prime factors up to `maxprime`.
So far, only `maxprime` of 2, 3, 5, 7 and 11 are supported.
`maxprime` of 5 gives less than 15% length reduction (and less than 6% for lengths above 400).
`maxprime` of 11 gives less than 8% length reduction (and less than 4% for lengths above 300).
"""
if maxprime not in {2,3,5,7,11}:
raise ValueError("only factors of 2, 3, 5, 7 and 11 are supported")
if l<=maxprime:
return l
if (maxprime,l) in _prev_len_cache:
return _prev_len_cache[maxprime,l]
if maxprime==2:
res=2**(l.bit_length()-1)
elif maxprime==3:
res=1
p3=1
while p3<=l:
r=l//p3
p23=2**(r.bit_length()-1)*p3
res=max(res,p23)
p3*=3
elif maxprime==5:
res=1
p5=1
while p5<=l:
p35=p5
while p35<=l:
r=l//p35
p235=2**(r.bit_length()-1)*p35
res=max(res,p235)
p35*=3
p5*=5
else:
prevprime=5 if maxprime==7 else 7
res=1
pm=1
while pm<=l:
res=max(res,get_prev_len(l//pm,prevprime)*pm)
pm*=maxprime
if l<1E5 or len(_prev_len_cache)<1E5:
_prev_len_cache[maxprime,l]=res
return res
[docs]
def truncate_trace(trace, maxprime=_default_maxprime):
"""
Truncate trace length to the nearest smaller length which is composed of prime factors up to `maxprime`.
So far, only `maxprime` of 2, 3, 5, 7 and 11 are supported.
`maxprime` of 5 gives less than 15% length reduction (and less than 6% for lengths above 400).
`maxprime` of 11 gives less than 8% length reduction (and less than 4% for lengths above 300).
"""
if not maxprime:
return trace
if maxprime==True:
maxprime=_default_maxprime
l=get_prev_len(len(trace),maxprime=maxprime)
return wrap(trace).t[:l]
[docs]
def normalize_fourier_transform(ft, normalization="none", df=None, copy=False):
"""
Normalize the Fourier transform data.
`ft` is a 1D trace or a 2D array with 2 columns: frequency and complex amplitude.
`normalization` can be ``'none'`` (standard numpy normalization), ``'sum'`` (the power sum is preserved: ``sum(abs(ft)**2)==sum(abs(trace)**2)``),
``'rms'`` (the power sum is equal to the trace RMS power: ``sum(abs(ft)**2)==mean(abs(trace)**2)``),
``'density'`` (power spectral density normalization, ``sum(abs(ft[:,1])**2)*df==mean(abs(trace[:,1])**2)``),
or ``'dBc'`` (same as ``'density'``, but normalized by the mean of the trace)
If ``normalization=='density'``, then `df` can specify the frequency step between two consecutive bins;
if `df` is ``None``, it is extracted from the first two points of the frequency axis (or set to 1, if `ft` is a 1D trace)
"""
if normalization=="none":
return ft
l=len(ft)
ft=wrap(ft)
if copy:
ft=ft.copy(wrapped=True)
if df is None:
df=ft[1,0]-ft[0,0] if ft.ndim()==2 else 1
if normalization=="sum":
norm=1/np.sqrt(l)
elif normalization in {"mean","rms"}:
norm=1/l
elif normalization=="density" or normalization=="dBc":
norm=1/(l*np.sqrt(abs(df)))
if normalization=="dBc":
dc_value=ft[len(ft)//2,1] if ft.ndim()==2 else ft[len(ft)//2]
norm/=dc_value/l
else:
raise ValueError("unrecognized normalization mode: {0}".format(normalization))
if ft.ndim()==2:
ft[:,1]=ft[:,1]*norm
else:
ft[:]=ft[:]*norm
return ft.cont
[docs]
def apply_window(trace_values, window="rectangle", window_power_compensate=True):
"""
Apply FT window to the trace.
If ``window_power_compensate==True``, multiply the data is multiplied by a compensating factor to preserve power in the spectrum.
"""
if window=="rectangle":
return trace_values
window=specfunc.get_window_func(window)
window_trace=window(np.arange(len(trace_values)),len(trace_values),ft_compensated=window_power_compensate)
return trace_values*window_trace
def _get_trace_values(wrapped_trace, index_column):
"""Get trace values from a 1D or 2D trace depending on its shape and on whether the index (time or frequency) column is present"""
if wrapped_trace.ndim()==1:
return wrapped_trace[:]
if wrapped_trace.shape()[1]==1:
return wrapped_trace[:,0]
if wrapped_trace.shape()[1]==(2 if index_column else 1):
return wrapped_trace[:,-1]
if wrapped_trace.shape()[1]==(3 if index_column else 2):
return wrapped_trace[:,-2]+1j*wrapped_trace[:,-1]
raise ValueError("function does not work for an array with shape {0}".format(wrapped_trace.shape()))
[docs]
def fourier_transform(trace, dt=None, truncate=False, normalization="none", single_sided=False, window="rectangle", window_power_compensate=True, raw=False):
"""
Calculate a fourier transform of the trace.
Args:
trace: Time trace to be transformed. It can be a 1D trace of values, a 2-column trace, or a 3-column trace.
If `dt` is ``None``, then the first column is assumed to be time (only support uniform time step),
and the other columns are either the trace values (for a single data column) or real and imaginary parts of the trace (for two data columns).
If `dt` is not ``None``, then the time column is assumed to be missing, so the two columns are assumed to be the real and the imaginary parts.
dt: if not ``None``, can specify the time step between the consecutive samples, in which case it is assumed that the time column is missing from the trace;
otherwise, try to get it from the time column of the trace if it exists, or set to 1 otherwise.
truncate (bool or int): Determines whether to truncate the trace to the nearest product of small primes (speeds up FFT algorithm);
can be ``False`` (no truncation), an integer 2, 3, 5, 7, or 11 (truncate to a product of primes up to and including this number),
or ``True`` (default prime factorization, currently set to 7)
normalization (str): Fourier transform normalization:
- ``'none'``: no (i.e., default numpy) normalization;
- ``'sum'``: the norm of the data is conserved (``sum(abs(ft[:,1])**2)==sum(abs(trace[:,1])**2)``);
- ``'rms'``: sum of the PSD is equal to the RMS trace amplitude squared (``sum(abs(ft[:,1])**2)==mean(abs(trace[:,1])**2)``);
- ``'density'``: power spectral density normalization, in ``x/rtHz`` (``sum(abs(ft[:,1])**2)*df==mean(abs(trace[:,1])**2)``);
- ``'dBc'``: like ``'density'``, but normalized to the mean trace value.
single_sided (bool): If ``True``, only leave positive frequency side of the transform.
window (str): FT window. Can be ``'rectangle'`` (essentially, no window), ``'hann'`` or ``'hamming'``.
window_power_compensate (bool): If ``True``, the data is multiplied by a compensating factor to preserve power in the spectrum.
raw (bool): if ``True``, return a simple 1D trace with the result.
Returns:
a two-column array of the same kind as the input, where the first column is frequency, and the second is complex FT data.
"""
wrapped=wrap(trace)
column_names=["frequency","ft_data"]
trace_values=_get_trace_values(wrapped,index_column=dt is None)
if len(trace_values)==0:
return np.array([]) if raw else wrapped.from_array(np.zeros((0,2)),column_names)
if len(trace_values)==1:
return np.array([]) if raw else wrapped.from_array(np.array([[0,trace_values[0]]]),column_names)
trace_values=truncate_trace(trace_values,maxprime=truncate)
trace_values=apply_window(trace_values,window,window_power_compensate=window_power_compensate)
ft=fft.fftshift(fft.fft(trace_values))
if dt is None:
dt=1. if wrapped.ndim()==1 or wrapped.shape()[1]==1 else wrapped[1,0]-wrapped[0,0]
df=1./(abs(np.real(dt))*len(ft))
if not raw:
frequencies=(np.arange(len(ft))-len(ft)//2)*df
ft=wrapped.from_columns([frequencies,ft],column_names) if wrapped.ndim()>1 else np.column_stack((frequencies,ft))
ft=normalize_fourier_transform(ft,normalization,df=df)
if single_sided:
if raw:
ft=ft[len(ft)//2:]
else:
ft=wrap(ft).t[len(ft)//2:,:]
ft[0,0]=0 # numerical error compensation
return ft
[docs]
def flip_fourier_transform(ft):
"""
Flip the fourier transform (analogous to making frequencies negative and flipping the order).
"""
ft=wrap(ft).copy(wrapped=True)
if len(ft)%2==1:
ft[:,1]=ft[::-1,1]
else:
ft[1::,1]=ft[:0:-1,1]
return ft.cont
[docs]
def inverse_fourier_transform(ft, df=None, truncate=False, zero_loc=None, symmetric_time=False, raw=False):
"""
Calculate an inverse fourier transform of the trace.
Args:
ft: Fourier transform data to be inverted. It can be a 1D trace of values, a 2-column trace, or a 3-column trace.
If `df` is ``None``, then the first column is assumed to be frequency (only support uniform frequency step),
and the other columns are either the trace values (for a single data column) or real and imaginary parts of the trace (for two data columns).
If `df` is not ``None``, then the frequency column is assumed to be missing, so the two columns are assumed to be the real and the imaginary parts.
df: if not ``None``, can specify the frequency step between the consecutive samples; otherwise, try to get it from the frequency column of the trace
if it exists, or set to 1 otherwise.
truncate (bool or int): Determines whether to truncate the trace to the nearest product of small primes (speeds up FFT algorithm);
can be ``False`` (no truncation), an integer 2, 3, 5, 7, or 11 (truncate to a product of primes up to and including this number),
or ``True`` (default prime factorization, currently set to 7)
zero_loc (bool): Location of the zero frequency point.
Can be ``None`` (the one with the value of f-axis closest to zero, or the first point if the frequency column is missing),
``'center'`` (mid-point), or an integer index.
symmetric_time (bool): If ``True``, make time axis go from ``(-0.5/df, 0.5/df)`` rather than ``(0, 1./df)``.
raw (bool): if ``True``, return a simple 1D trace with the result.
Returns:
a two-column array, where the first column is frequency, and the second is the complex-valued trace data.
"""
wrapped=wrap(ft)
column_names=["time","data"]
ft_values=_get_trace_values(wrapped,index_column=df is None)
if len(ft)==0:
return np.array([]) if raw else wrapped.from_array(np.zeros((0,2)),column_names)
if len(ft)==1:
return np.array([ft_values[0]]) if raw else wrapped.from_array(np.array([[0,ft_values[0]]]),column_names)
if zero_loc is None:
if df is not None or wrapped.ndim()==1 or wrapped.shape()[1]==1:
zero_freq_point=0
else:
zero_freq_point=utils.find_closest_arg(wrapped.c[0],0,ordered=True)
if zero_freq_point is None:
raise ValueError("can't find zero frequency point; closest is {0}".format(wrapped[zero_freq_point,0]))
elif zero_loc=="center":
zero_freq_point=len(ft)//2
else:
zero_freq_point=zero_loc
if zero_freq_point!=0:
ft_values=np.concatenate(( ft_values[zero_freq_point:], ft_values[:zero_freq_point] ))
ft_values=truncate_trace(ft_values,maxprime=truncate)
trace=fft.ifft(ft_values)
l=len(trace)
if symmetric_time:
trace=np.concatenate((trace[l//2:],trace[:l//2]))
if raw:
return trace
if df is None:
df=1. if wrapped.ndim()==1 or wrapped.shape()[1]==1 else wrapped[1,0]-wrapped[0,0]
dt=1./(abs(np.real(df))*l)
times=np.arange(len(ft))*dt
if symmetric_time:
times-=times[l//2]
if wrapped.ndim()==1:
return np.column_stack((times,trace))
else:
return wrapped.from_columns([times,trace],column_names)
[docs]
def power_spectral_density(trace, dt=None, truncate=False, normalization="density", single_sided=False, window="rectangle", window_power_compensate=True, raw=False):
"""
Calculate a power spectral density of the trace.
Args:
trace: Time trace to be transformed. It can be a 1D trace of values, a 2-column trace, or a 3-column trace.
If `dt` is ``None``, then the first column is assumed to be time (only support uniform time step),
and the other columns are either the trace values (for a single data column) or real and imaginary parts of the trace (for two data columns).
If `dt` is not ``None``, then the time column is assumed to be missing, so the two columns are assumed to be the real and the imaginary parts.
dt: if not ``None``, can specify the time step between the consecutive samples; otherwise, try to get it from the time column of the trace
if it exists, or set to 1 otherwise.
truncate (bool or int): Determines whether to truncate the trace to the nearest product of small primes (speeds up FFT algorithm);
can be ``False`` (no truncation), an integer 2, 3, 5, 7, or 11 (truncate to a product of primes up to and including this number),
or ``True`` (default prime factorization, currently set to 7)
normalization (str): Fourier transform normalization:
- ``'none'``: no (i.e., default numpy) normalization;
- ``'sum'``: the norm of the data is conserved (``sum(PSD[:,1])==sum(abs(trace[:,1])**2)``);
- ``'rms'``: sum of the PSD is equal to the RMS trace amplitude squared (``sum(PSD[:,1])==mean(abs(trace[:,1])**2)``);
- ``'density'``: power spectral density normalization, in ``x/rtHz`` (``sum(PSD[:,1])*df==mean(abs(trace[:,1])**2)``);
- ``'dBc'``: like ``'density'``, but normalized to the mean trace value.
single_sided (bool): If ``True``, only leave positive frequency side of the PSD.
window (str): FT window. Can be ``'rectangle'`` (essentially, no window), ``'hann'`` or ``'hamming'``.
window_power_compensate (bool): If ``True``, the data is multiplied by a compensating factor to preserve power in the spectrum.
raw (bool): if ``True``, return a simple 1D trace with the result.
Returns:
a two-column array, where the first column is frequency, and the second is positive PSD.
"""
column_names=["frequency","PSD"]
ft=fourier_transform(trace,dt=dt,truncate=truncate,normalization=normalization,
single_sided=single_sided,window=window,window_power_compensate=window_power_compensate,raw=True)
if raw:
return abs(ft)**2
wrapped=wrap(trace)
if dt is None:
dt=1. if wrapped.ndim()==1 or wrapped.shape()[1]==1 else wrapped[1,0]-wrapped[0,0]
df=1./(dt*wrapped.shape()[0])
if single_sided:
frequencies=np.arange(len(ft))*df
else:
frequencies=(np.arange(len(ft))-len(ft)//2)*df
if wrapped.ndim()==1:
return np.column_stack((frequencies,abs(ft)**2))
else:
return wrapped.from_columns((frequencies,abs(ft)**2),column_names)
[docs]
def get_real_part_ft(ft):
"""
Get the fourier transform of the real part only from the fourier transform of a complex variable.
"""
re_ft=wrap(ft).copy(wrapped=True)
re_ft[1:,1]=(ft[1:,1]+ft[:0:-1,1].conjugate())*0.5
re_ft[0,1]=np.real(ft[0,1])
return re_ft.cont
[docs]
def get_imag_part_ft(ft):
"""
Get the fourier transform of the imaginary part only from the fourier transform of a complex variable.
"""
im_ft=wrap(ft).copy(wrapped=True)
im_ft[1:,1]=(im_ft[1:,1]-im_ft[:0:-1,1].conjugate())/2.j
im_ft[0,1]=im_ft[0,1].imag
return im_ft.cont
[docs]
def get_correlations_ft(ft_a, ft_b, zero_mean=True, normalization="none"):
"""
Calculate the correlation function of the two variables given their fourier transforms.
Args:
ft_a: first variable fourier transform
ft_b: second variable fourier transform
zero_mean (bool): If ``True``, the value corresponding to the zero frequency is set to zero (only fluctuations around means of ``a`` and ``b`` are calculated).
normalization (str): Can be ``'whole'`` (correlations are normalized by product of PSDs derived from `ft_a` and `ft_b`)
or ``'individual'`` (normalization is done for each frequency individually, so that the absolute value is always 1).
"""
if len(ft_a)!=len(ft_b):
raise ValueError("transforms should be of the same length")
corr=ft_a.copy()
corr[:,1]=corr[:,1]*ft_b[:,1].conjugate()
if (zero_mean):
corr[len(corr)/2,1]=0.
if normalization=="whole":
norm_a=(abs(ft_a[:,1])**2).sum()-abs(ft_a[len(ft_a)/2,1])**2
norm_b=(abs(ft_b[:,1])**2).sum()-abs(ft_b[len(ft_b)/2,1])**2
corr[:,1]=corr[:,1]/(norm_a*norm_b)**.5
elif normalization=="individual":
norm_factors=abs(ft_a[:,1]*ft_b[:,1])
corr[:,1]=corr[:,1]/norm_factors
elif normalization!="none":
raise ValueError("unrecognized normalization method: {0}".format(normalization))
return corr