"""
Generic utilities for dealing with numerical arrays.
"""
from .table_wrap import wrap
from ..utils import general as general_utils
from ..utils import numerical
import numpy as np
import pandas as pd
##### Trace properties #####
[docs]
def is_ascending(trace):
"""
Check the if the trace is ascending.
If it has more than 1 dimension, check all lines along 0'th axis.
"""
if len(trace)<2:
return True
wrapped=wrap(trace)
return np.all(wrapped[1:]-wrapped[:-1]>=0)
[docs]
def is_descending(trace):
"""
Check if the trace is descending.
If it has more than 1 dimension, check all lines along 0'th axis.
"""
if len(trace)<2:
return True
wrapped=wrap(trace)
return np.all(wrapped[1:]-wrapped[:-1]<=0)
[docs]
def is_ordered(trace):
"""
Check if the trace is ordered (ascending or descending).
If it has more than 1 dimension, check all lines along 0'th axis.
"""
return is_ascending(trace) or is_descending(trace)
[docs]
def is_linear(trace):
"""
Check if the trace is linear (values go with a constant step).
If it has more than 1 dimension, check all lines along 0'th axis (with the same step for all).
"""
if len(trace)<2:
return True
wrapped=wrap(trace)
diff=wrapped[1:]-wrapped[:-1]
return np.all(diff==diff[(0,)*np.ndim(diff)])
##### Columns access #####
[docs]
def get_x_column(t, x_column=None, idx_default=False):
"""
Get x column of the table.
`x_column` can be
- an array: return as is;
- ``'#'``: return index array;
- ``None``: equivalent to '#' for 1D data if ``idx_default==False``, or to ``0`` otherwise;
- integer: return the column with this index.
"""
if np.ndim(x_column)>0:
return x_column
if x_column=="#":
return np.asarray(t.index) if isinstance(t,pd.DataFrame) or isinstance(t,pd.Series) else np.arange(len(t))
elif np.ndim(t)==1:
if x_column is None and idx_default:
return np.arange(len(t))
return t
else:
if x_column is None:
x_column=0
try:
return wrap(t)[:,x_column]
except ValueError:
return np.asarray(t[x_column])
[docs]
def get_y_column(t, y_column=None):
"""
Get y column of the table.
`y_column` can be
- an array: return as is;
- ``'#'``: return index array;
- ``None``: return `t` for 1D data, or the column ``1`` otherwise;
- integer: return the column with this index.
"""
if np.ndim(y_column)>0:
return y_column
if y_column=="#":
return t.index if isinstance(t,pd.DataFrame) or isinstance(t,pd.Series) else np.arange(len(t))
elif np.ndim(t)==1:
return t
else:
if y_column is None:
y_column=1
try:
return wrap(t)[:,y_column]
except ValueError:
return np.asarray(t[y_column])
##### Sorting #####
try:
np.argsort([],kind="stable")
_stable_sort="stable"
except ValueError:
_stable_sort="mergesort"
[docs]
def sort_by(t, x_column=None, reverse=False, stable=False):
"""
Sort a table using selected column as a key and preserving rows.
If ``reverse==True``, sort in descending order. `x_column` values are described in :func:`.get_x_column`.
If ``stable==True``, use stable sort (could be slower and uses more memory, but preserves the order of elements for the same key)
"""
x_column=get_x_column(t,x_column)
if reverse:
return wrap(t).t[x_column.argsort(kind=_stable_sort if stable else "quicksort"),::-1]
else:
return wrap(t).t[x_column.argsort(kind=_stable_sort if stable else "quicksort")]
##### Filtering #####
[docs]
def filter_by(t, columns=None, pred=None, exclude=False):
"""
Filter 1D or 2D array using a predicate.
If the data is 2D, `columns` contains indices of columns to be passed to the `pred` function.
If ``exclude==False``, drop all of the rows satisfying `pred` rather than keep them.
"""
if pred is None:
return t
pred=general_utils.to_predicate(pred)
wrapped=wrap(t)
if wrapped.ndim()==1:
sat=np.array([pred(r) for r in wrapped]).astype(bool)
else:
if columns is None:
columns=slice(None)
wrapped_subtable=wrapped.subtable((slice(None),columns),wrapped=True)
sat=np.array([pred(*r) for r in wrapped_subtable.r]).astype(bool)
return wrapped.t[~sat if exclude else sat]
[docs]
def unique_slices(t, u_column):
"""
Split a table into subtables with different values in a given column.
Return a list of `t` subtables, each of which has a different (and equal among all rows in the subtable) value in `u_column`.
"""
wrapped=wrap(t)
u_column=get_y_column(t,u_column)
vals=np.unique(u_column)
return [(v,wrapped.t[u_column==v]) for v in vals]
##### Merging #####
def _get_common_type(types):
counts={"2d.pandas":0,"2d.array":0,"1d.series":0,"1d.array":0}
for t in types:
counts[t]=counts[t]+1
if counts["2d.array"]>0 or counts["1d.series"]>0 or counts["1d.array"]>0:
return "array"
else:
return "pandas"
[docs]
def merge(ts, idx=None, as_array=True):
"""
Merge several tables column-wise.
If `idx` is not ``None``, then it is a list of index columns (one column per table) used for merging.
The rows that have the same value in the index columns are merged; if some values aren't contained in all the `ts`, the corresponding rows are omitted.
If `idx` is ``None``, just join the tables together (they must have the same number of rows).
If ``as_array==True``, return a simple numpy array as a result; otherwise, return a pandas DataFrame if applicable
(note that in this case all column names in all tables must be different to avoid conflicts)
"""
if idx is not None:
if isinstance(idx,list) or isinstance(idx,tuple):
if len(idx)!=len(ts):
raise ValueError("idx length is different from tables length")
else:
idx=[idx]*len(ts)
wrapped=[wrap(t) for t in ts]
result_type="array" if as_array else _get_common_type([w.get_type() for w in wrapped])
if idx is not None: # select common indices
idx_cols=[w.c[c] if w.ndim()==2 else w.cont for c,w in zip(idx,wrapped)]
common_idx=set.intersection(*[set(i) for i in idx_cols])
common_idx=np.sort(np.array(list(common_idx)))
cut=[wrap(common_idx)]
for c,w in zip(idx,wrapped):
if w.ndim()==2:
t=sort_by(filter_by(w.cont,c,common_idx),c)
t=wrap(t).c.get_deleted(c,wrapped,wrapped=True)
cut.append(t)
wrapped=cut
if result_type=="array":
ts=[np.column_stack((w[:])) if w.ndim()==1 else w[:,:] for w in wrapped] # pylint: disable=not-callable
return np.concatenate(ts,axis=1)
else:
columns,names=zip(*[(n,v) for w in wrapped for n,v in zip(w.c.get_names(),w.c)])
return pd.DataFrame(dict(zip(names,columns)),columns=names)
##### Limits and ranges #####
[docs]
class Range:
"""
Single data range.
If `start` or `stop` are ``None``, it's implied that they're at infinity (i.e., Range(None,None) is infinite).
If the range object is ``None``, it's implied that the range is empty
"""
def __init__(self, start=None, stop=None):
if isinstance(start,(list,tuple)):
start,stop=start
self._rng=[start,stop]
self._reorder()
def _reorder(self):
if (self._rng[0] is not None) and (self._rng[1] is not None):
self._rng=[min(*self._rng),max(*self._rng)]
@property
def start(self): return self._rng[0]
@start.setter
def start(self, val):
self._rng[0]=val
self._reorder()
@property
def stop(self): return self._rng[1]
@stop.setter
def stop(self, val):
self._rng[1]=val
self._reorder()
def __getitem__(self, i):
return self._rng[i]
def __setitem__(self, i, val):
self._rng[i]=val
self._reorder()
def __len__(self): return 2
def __str__(self): return "({0}, {1})".format(*self._rng)
def __repr__(self): return "Range({0}, {1})".format(*self._rng)
[docs]
def contains(self, x):
"""Check if `x` is in the range"""
if self is None:
return (x!=x) # all False
if (self._rng[0] is not None) and (self._rng[1] is not None):
return (x>=self._rng[0])&(x<=self._rng[1])
elif self._rng[0] is not None:
return (x>=self._rng[0])
elif self._rng[1] is not None:
return (x<=self._rng[1])
else:
return (x==x) # all True
@staticmethod
def _min(a,b):
if a is None:
return b
elif b is None:
return a
return min(a,b)
@staticmethod
def _max(a,b):
if a is None:
return b
elif b is None:
return a
return max(a,b)
def _intersect_two(self, rng):
if rng is None:
return None
left=self._max(self._rng[0],rng._rng[0])
right=self._min(self._rng[1],rng._rng[1])
if (left is not None) and (right is not None) and left>right:
return None
return Range(left,right)
[docs]
def intersect(self, *rngs):
"""
Find an intersection of multiple ranges.
If the intersection is empty, return ``None``.
"""
rng=self
for r in rngs:
if rng is None:
return None
rng=rng._intersect_two(r)
return rng
[docs]
def rescale(self, mult=1., shift=0.):
for i in [0,1]:
self._rng[i]=None if self._rng[i] is None else self._rng[i]*mult+shift
if mult<0:
self._rng=self._rng[::-1]
self._reorder()
return self
[docs]
def tup(self):
return tuple(self._rng)
[docs]
def find_closest_arg(xs, x, approach="both", ordered=False):
"""
Find the index of a value in `xs` that is closest to `x`.
`approach` can take values ``'top'``, ``'bottom'`` or ``'both'`` and denotes from which side should array elements approach `x`
(meaning that the found array element should be ``>x``, ``<x`` or just the closest one).
If there are no elements lying on the desired side of `x` (e.g. ``approach=='top'`` and all elements of `xs` are less than `x`), the function returns ``None``.
if ``ordered==True``, then `xs` is assumed to be in ascending or descending order, and binary search is implemented (works only for 1D arrays).
if there are recurring elements, return any of them.
"""
if not (approach in ["top","bottom","both"]):
raise ValueError("unrecognized approaching mode: {0}".format(approach))
try:
return xs.find_closest_arg(x,approach=approach,ordered=ordered)
except AttributeError:
pass
if not ordered:
diff_array=np.asarray(xs)-x
if approach=="top":
threshold=diff_array>=0
if threshold.any():
diff_array=diff_array*threshold+(diff_array.max()+1.)*(~threshold)
return diff_array.argmin()
else:
return None
elif approach=="bottom":
threshold=diff_array<=0
if threshold.any():
diff_array=diff_array*threshold+(diff_array.min()-1.)*(~threshold)
return diff_array.argmax()
else:
return None
else:
return abs(diff_array).argmin()
else:
wxs=wrap(xs)
if wxs.ndim()!=1:
raise ValueError("ordered method is only applicable to 1D arrays")
if len(xs)==0:
return None
lb,hb=0,len(xs)-1
if wxs[0]>wxs[-1]: # must be reverse ordered
arg_rev=find_closest_arg(xs[::-1],x,approach=approach,ordered=True)
return len(xs)-1-arg_rev if arg_rev is not None else arg_rev
if wxs[lb]>x:
if approach=="bottom":
return None
else:
return lb
if wxs[hb]<x:
if approach=="top":
return None
else:
return hb
while hb-lb>1:
i=(lb+hb)//2
el=wxs[i]
if el<x:
lb=i
elif el>x:
hb=i
else:
return i
if approach=="top":
return hb
elif approach=="bottom":
return lb
else:
if abs(wxs[lb]-x)<abs(wxs[hb]-x):
return lb
else:
return hb
[docs]
def find_closest_value(xs, x, approach="both", ordered=False):
return wrap(xs)[find_closest_arg(xs,x,approach=approach,ordered=ordered)]
[docs]
def get_range_indices(xs, xs_range, ordered=False):
"""
Find trace indices corresponding to the given range.
The range is defined as ``xs_range[0]:xs_range[1]``, or infinite if ``xs_range=None`` (so the data is returned unchanged in that case).
If ``ordered==True``, then the function assumes that `xs` in ascending or descending order.
"""
if xs_range is not None:
xs_range=Range(*xs_range)
wrapped=wrap(xs)
if ordered and len(wrapped)>10:
if wrapped[1]>wrapped[0]:
if xs_range.start is None:
min_i=0
else:
min_i=find_closest_arg(xs,xs_range.start,approach="top",ordered=True)
if xs_range.stop is None:
max_i=len(xs)
else:
max_i=find_closest_arg(xs,xs_range.stop,approach="bottom",ordered=True)
else:
if xs_range.start is None:
max_i=len(xs)
else:
max_i=find_closest_arg(xs,xs_range.start,approach="top",ordered=True)
if xs_range.stop is None:
min_i=0
else:
min_i=find_closest_arg(xs,xs_range.stop,approach="bottom",ordered=True)
if min_i is None or max_i is None:
return slice(0)
else:
return slice(min_i,max_i+1)
else:
return xs_range.contains(np.asarray(xs))
else:
return slice(0)
[docs]
def cut_to_range(t, xs_range, x_column=None, ordered=False):
"""
Cut the table to the given range based on `x_column`.
The range is defined as ``xs_range[0]:xs_range[1]``, or infinite if ``xs_range=None``.
`x_column` is used to determine which column's values to use to check if the point is in range (see :func:`.get_x_column`).
If ``ordered_x==True``, then the function assumes that `x_column` in ascending order.
"""
x_column=get_x_column(t,x_column)
indices=get_range_indices(x_column,xs_range,ordered=ordered)
return wrap(t).t[indices]
[docs]
def cut_out_regions(t, regions, x_column=None, ordered=False, multi_pass=True):
"""
Cut the regions out of the `t` based on `x_column`.
`x_column` is used to determine which column's values to use to check if the point is in range (see :func:`.get_x_column`).
If ``ordered_x==True``, then the function assumes that `x_column` in ascending order.
If ``multi_pass==False``, combine all indices before deleting the data in a single operation (works faster, but only for non-intersecting regions).
"""
if not regions:
return t
wrapped=wrap(t).copy(wrapped=True)
x_column=np.asarray(get_x_column(t,x_column,idx_default=True))
if multi_pass:
for r in regions:
idx=get_range_indices(x_column,r,ordered=ordered)
x_column=np.delete(x_column,idx)
del wrapped.r[idx]
else:
all_idx=[]
for r in regions:
idx=get_range_indices(x_column,r,ordered=ordered)
if isinstance(idx,slice):
idx=range(*idx.indices(len(x_column)))
all_idx=all_idx+list(idx)
del wrapped.r[all_idx]
return wrapped.cont
##### Unwrapping #####
[docs]
def find_discrete_step(trace, min_fraction=1E-8, tolerance=1E-5):
"""
Try to find a minimal divisor of all steps in a 1D trace.
`min_fraction` is the minimal possible size of the divisor (relative to the minimal non-zero step size).
`tolerance` is the tolerance of the division.
Raise an :exc:`ArithmeticError` if no such value was found.
"""
trace=np.asarray(trace)
if len(trace)<2:
raise ValueError('trace length should be at least 2')
diffs=trace[1:]-trace[:-1]
diffs=diffs[np.abs(diffs)!=0]
if len(diffs)==0:
return 0
q=abs(diffs[0])
for d in diffs[1:]:
if d!=0:
q=numerical.gcd_approx(q,abs(d),min_fraction,tolerance)
return q
[docs]
def unwrap_mod_data(trace, wrap_range):
"""
Unwrap data given `wrap_range`.
Assume that every jump greater than ``0.5*wrap_range`` is not real and is due to value being restricted.
Can be used to, e.g., unwrap the phase data.
"""
trace=np.asarray(trace)
if len(trace)<2:
return trace
res=trace.copy()
wraps=0
prev=trace[0]
for (i,v) in enumerate(trace[1:]):
if abs(v-prev)>wrap_range*.5:
wraps=wraps-round((v-prev)/wrap_range)*wrap_range
res[i+1]=v+wraps
prev=v
return res
##### Trace expansion on the edges #####
[docs]
def pad_trace(trace, pad, mode="constant", cval=0.):
"""
Expand 1D trace or a multi-column table for different convolution techniques.
Wrapper around :func:`numpy.pad`, but can handle pandas dataframes or multi-column arrays.
Note that the index data is not preserved.
Args:
trace: 1D array-like object.
pad (int or tuple): Expansion size. Can be an integer, if pad on both sides is equal, or a 2-tuple ``(left, right)`` for pads on opposite sides.
mode (str): Expansion mode. Takes the same values as :func:`numpy.pad`.
Common values are ``'constant'`` (added values are determined by `cval`), ``'edge'`` (added values are end values of the trace),
``'reflect'`` (reflect trace with respect to its endpoint) or ``'wrap'`` (wrap the values from the other size).
cval (float): If ``mode=='constant'``, determines the expanded values.
"""
wrapped=wrap(trace)
if wrapped.ndim()==2:
return wrapped.columns_replaced([pad_trace(wrapped.c[i],pad=pad,mode=mode,cval=cval)
for i in range(wrapped.shape()[1])])
elif wrapped.ndim()!=1:
raise ValueError("this function accepts only 1D or 2D arrays")
if len(trace)==0 or pad==0:
return trace
kwargs={"constant_values":cval} if mode=="constant" else {}
res=np.pad(trace,pad,mode=mode,**kwargs)
return wrapped.array_replaced(res)
##### Representation transformations #####
[docs]
def xy2c(t):
"""
Convert a trace or a table from xy representation to a single complex data.
`t` is a 2D array with either 2 columns (x and y) or 3 columns (index, x and y).
Return 2D array with either 1 column (c) or 2 columns (index and c).
"""
wrapped=wrap(t)
if wrapped.ndim()!=2:
raise ValueError("xy representation should be 2D")
if wrapped.shape()[1] not in [2,3]:
raise ValueError("xy representation can only have 2 or 3 columns")
if wrapped.shape()[1]==2:
if wrapped.get_type=="1d.array":
return wrapped[:,0]+1j*wrapped[:,1]
else:
return pd.DataFrame({"c":wrapped[:,0]+1j*wrapped[:,1]},index=wrapped.get_index())
return
else:
c=wrapped[:,1]+1j*wrapped[:,2]
return wrapped.from_columns([t[:,0],c],column_names=[wrapped.c.get_names()[0],"c"],index=wrapped.get_index())
[docs]
def c2xy(t):
"""
Convert the a trace or a table from complex representation to a split x and y data.
`t` is either 1D array (c data) or a 2D array with either 1 column (c) or 2 columns (index and c).
Return 2D array with either 2 column (x and y) or 3 columns (index, x and y).
"""
wrapped=wrap(t)
t=np.asarray(t)
if wrapped.ndim()==1:
if wrapped.get_type=="1d.array":
return np.column_stack((t.real,t.imag))
else:
return pd.DataFrame({"x":t.real,"t":t.imag},index=wrapped.get_index())
if wrapped.shape()[1]==1:
columns=[t[:,0].real,t[:,0].imag]
return wrapped.from_columns(columns,column_names=["x","y"],index=wrapped.get_index())
if wrapped.shape()[1]==2:
columns=[t[:,0].real,t[:,1].real,t[:,1].imag]
return wrapped.from_columns(columns,column_names=[wrapped.c.get_names()[0],"x","y"],index=wrapped.get_index())
else:
raise ValueError("2D c representation can only have 2 columns")