Source code for pylablib.devices.AWG.generic

from ...core.devio import SCPI, interface, comm_backend
from ...core.utils import units

import numpy as np

[docs] class GenericAWGError(comm_backend.DeviceError): """Generic AWG error"""
[docs] class GenericAWGBackendError(GenericAWGError,comm_backend.DeviceBackendError): """AWG backend communication error"""
[docs] class GenericAWG(SCPI.SCPIDevice): """ Generic arbitrary wave generator, based on Agilent 33500. With slight modifications works for many other AWGs using largely the same syntax. """ _exclude_commands=set() _single_channel_commands=set() _all_channel_commands=set() _set_angle_unit=True _default_angle_unit="deg" _force_channel_source_pfx=False _default_operation_cooldown={"write":1E-2} _channels_number=1 _default_load=50 _inf_load=1E6 _range_mode="high_low" # range setting mode; can be "high_low", "amp_off", or "both" (set both, return "amp_off") _function_aliases={"sine":"SIN","square":"SQU","ramp":"RAMP","pulse":"PULS","noise":"NOIS","prbs":"PRBS","dc":"DC","user":"USER","arb":"ARB"} _supported_functions=list(_function_aliases) Error=GenericAWGError ReraiseError=GenericAWGBackendError _bool_selector=("OFF","ON") def __init__(self, addr): super().__init__(addr) self._channels_number=self._detect_channels_number() self._add_info_variable("channels_number",self.get_channels_number) functions={k:v for (k,v) in self._function_aliases.items() if k in self._supported_functions} if "*" in self._supported_functions: self._add_parameter_class(interface.EnumParameterClass("function",functions,allowed_alias="all",allowed_value="all",match_prefix=True)) else: self._add_parameter_class(interface.EnumParameterClass("function",functions,match_prefix=True)) for ch in range(1,self._channels_number+1): self._add_scpi_parameter("output_on","",kind="bool",channel=ch,comm_kind="output",add_variable=True) self._add_scpi_parameter("output_polarity","POLARITY",kind="param",parameter="polarity",channel=ch,comm_kind="output",add_variable=True) self._add_scpi_parameter("output_sync","SYNC",kind="bool",channel=ch,comm_kind="output",add_variable=True) self._add_scpi_parameter("output_load","LOAD",kind="string",channel=ch,comm_kind="output",add_variable=False) self._add_settings_variable("output_load",self.get_load,self.set_load,channel=ch) self._add_settings_variable("output_range",self.get_output_range,self.set_output_range,multiarg=False,channel=ch) self._add_scpi_parameter("amplitude","VOLTAGE",kind="float",channel=ch) self._add_scpi_parameter("offset","VOLTAGE:OFFSET",kind="float",channel=ch) self._add_settings_variable("frequency",self.get_frequency,self.set_frequency,channel=ch) self._add_settings_variable("phase",self.get_phase,self.set_phase,channel=ch) self._add_scpi_parameter("phase","PHASE",kind="float",channel=ch) self._add_scpi_parameter("function","FUNCTION",kind="param",parameter="function",channel=ch,add_variable=True) self._add_scpi_parameter("duty_cycle","FUNCTION:SQUARE:DCYCLE",channel=ch,add_variable=True) self._add_scpi_parameter("ramp_symmetry","FUNCTION:RAMP:SYMMETRY",channel=ch,add_variable=True) self._add_scpi_parameter("pulse_width","FUNCTION:PULSE:WIDTH",channel=ch,add_variable=True) self._add_scpi_parameter("burst_enabled","BURST:STATE",kind="bool",channel=ch,add_variable=True) self._add_scpi_parameter("burst_mode","BURST:MODE",kind="param",parameter="burst_mode",channel=ch,add_variable=True) self._add_settings_variable("burst_ncycles",self.get_burst_ncycles,self.set_burst_ncycles,channel=ch) self._add_scpi_parameter("gate_polarity","BURST:GATE:POL",kind="param",parameter="polarity",channel=ch,set_delay=0.1,add_variable=True) self._add_scpi_parameter("trigger_source","TRIG:SOURCE",kind="param",parameter="trigger_source",channel=ch,add_variable=True) self._add_scpi_parameter("trigger_slope","TRIG:SLOPE",kind="param",parameter="slope",channel=ch,add_variable=True) self._add_scpi_parameter("trigger_output","OUTPUT:TRIG",kind="bool",channel=ch,add_variable=True) self._add_scpi_parameter("output_trigger_slope","OUTPUT:TRIG:SLOPE",kind="param",parameter="slope",channel=ch,add_variable=True) self._add_scpi_parameter("voltage_unit","VOLTAGE:UNIT",kind="string") self._add_scpi_parameter("phase_unit","UNIT:ANGLE",kind="string") self._current_channel=1 if self._range_mode=="both": # synchronize amp/off and high/low settings for ch in range(1,self._channels_number+1): self.set_output_range(self.get_output_range(channel=ch),channel=ch) _all_parameters={ "output_on","output_polarity","output_sync","output_load", "amplitude","offset","frequency","phase","voltage_unit","phase_unit", "function","duty_cycle","ramp_symmetry","pulse_width", "burst_enabled","burst_mode","burst_ncycles","gate_polarity", "trigger_source","trigger_slope","trigger_output","output_trigger_slope"} # not used, but good for reference / use in derived classes _p_polarity=interface.EnumParameterClass("polarity",["norm","inv"],value_case="upper",match_prefix=True) _p_burst_mode=interface.EnumParameterClass("burst_mode",["trig","gate"],value_case="upper",match_prefix=True) _p_trigger_source=interface.EnumParameterClass("trigger_source",["imm","ext","bus"],value_case="upper",match_prefix=True) _p_slope=interface.EnumParameterClass("slope",["pos","neg"],value_case="upper",match_prefix=True) def _detect_channels_number(self): if self._channels_number=="auto": return 2 if self._is_command_valid("OUTPUT2?") else 1 else: return self._channels_number
[docs] def get_channels_number(self): """Get the number of channels""" return self._channels_number
def _can_add_command(self, name, channel=1): return not (name in self._exclude_commands or (name in self._single_channel_commands and channel>1)) def _check_command(self, name, channel=1, raise_error=True): channel=self._get_channel(channel) if not self._can_add_command(name,channel=channel): if raise_error: raise self.Error("option '{}' for channel {} is not supported by this device".format(name,channel)) return False return True def _add_scpi_parameter(self, name, comm, kind="float", parameter=None, set_delay=0, channel=None, comm_kind="source", add_variable=False): # pylint: disable=arguments-differ if channel is not None and name not in self._all_channel_commands: if not self._can_add_command(name,channel): return name=self._build_variable_name(name,channel) comm=self._build_channel_command(comm,channel,kind=comm_kind) return super()._add_scpi_parameter(name,comm,kind=kind,parameter=parameter,set_delay=set_delay,add_variable=add_variable) def _modify_scpi_parameter(self, name, comm, kind=None, parameter=None, set_delay=0, channel=None, comm_kind="source"): # pylint: disable=arguments-differ if channel is not None and name not in self._all_channel_commands: if not self._can_add_command(name,channel): return name=self._build_variable_name(name,channel) comm=self._build_channel_command(comm,channel,kind=comm_kind) return super()._modify_scpi_parameter(name,comm,kind=kind,parameter=parameter,set_delay=set_delay) def _add_settings_variable(self, path, getter=None, setter=None, ignore_error=(), mux=None, multiarg=True, channel=None): # pylint: disable=arguments-differ, arguments-renamed if channel is not None and path not in self._all_channel_commands: if not self._can_add_command(path,channel): return path=self._build_variable_name(path,channel) if getter is not None: ogetter=getter getter=lambda *args: ogetter(*args,channel=channel) if getter is not None: osetter=setter setter=lambda *args: osetter(*args,channel=channel) return super()._add_settings_variable(path,getter=getter,setter=setter,ignore_error=ignore_error,mux=mux,multiarg=multiarg) def _get_channel_scpi_parameter(self, name, channel=None): self._check_command(name,channel=channel) if name in self._all_channel_commands: return self._get_scpi_parameter(name) channel=self._get_channel(channel) return self._get_scpi_parameter(self._build_variable_name(name,channel)) def _set_channel_scpi_parameter(self, name, value, channel=None, result=False): self._check_command(name,channel=channel) if name in self._all_channel_commands: return self._set_scpi_parameter(name,value,result=result) channel=self._get_channel(channel) return self._set_scpi_parameter(self._build_variable_name(name,channel),value,result=result) def _get_scpi_parameter(self, name): try: return super()._get_scpi_parameter(name) except KeyError: raise self.Error("option '{}' is not supported by this device".format(name)) def _set_scpi_parameter(self, name, value, result=False): try: return super()._set_scpi_parameter(name,value,result=result) except KeyError: raise self.Error("option '{}' is not supported by this device".format(name)) def _ask_channel(self, msg, data_type="string", delay=0., timeout=None, read_echo=False, name=None, channel=None, comm_kind="source"): self._check_command(name,channel=channel) if name not in self._all_channel_commands: msg=self._build_channel_command(msg,channel,kind=comm_kind) return super().ask(msg,data_type=data_type,delay=delay,timeout=timeout,read_echo=read_echo) def _write_channel(self, msg, arg=None, arg_type=None, unit=None, bool_selector=("OFF","ON"), wait_sync=None, read_echo=False, read_echo_delay=0., name=None, channel=None, comm_kind="source"): self._check_command(name,channel=channel) if name not in self._all_channel_commands: msg=self._build_channel_command(msg,channel,kind=comm_kind) return super().write(msg,arg=arg,arg_type=arg_type,unit=unit,bool_selector=bool_selector,wait_sync=wait_sync,read_echo=read_echo,read_echo_delay=read_echo_delay) def _build_variable_name(self, name, channel): """Build channel-specific settings variable name""" if self._channels_number==1: return name else: return "ch{}/{}".format(channel,name) def _build_channel_command(self, comm, channel, kind="source"): """Build channel-specific command""" channel=self._get_channel(channel) if kind=="source": if self._channels_number==1: return "SOURCE:{}".format(comm) if self._force_channel_source_pfx else comm return "SOURCE{}:{}".format(channel,comm) elif kind=="output": if self._channels_number==1: pfx="OUTPUT" else: pfx="OUTPUT{}".format(channel) return pfx+":"+comm if comm else pfx def _check_ch(self, channel=None): if channel is not None and (channel<1 or channel>self._channels_number): raise ValueError("invalid channel: {}".format(channel)) def _get_channel(self, channel=None): self._check_ch(channel) return self._current_channel if channel is None else channel
[docs] def get_current_channel(self): """Get current channel""" return self._current_channel
[docs] def select_current_channel(self, channel): """Select current default channel""" self._check_ch(channel) self._current_channel=channel
[docs] def is_output_enabled(self, channel=None): """Check if the output is enabled""" return self._get_channel_scpi_parameter("output_on",channel=channel)
[docs] def enable_output(self, enabled=True, channel=None): """Turn the output on or off""" return self._set_channel_scpi_parameter("output_on",enabled,channel=channel,result=True)
[docs] def get_output_polarity(self, channel=None): """ Get output polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._get_channel_scpi_parameter("output_polarity",channel=channel)
[docs] def set_output_polarity(self, polarity="norm", channel=None): """ Set output polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._set_channel_scpi_parameter("output_polarity",polarity,channel=channel,result=True)
[docs] def is_sync_output_enabled(self, channel=None): """Check if SYNC output is enabled""" return self._get_channel_scpi_parameter("output_sync",channel=channel)
[docs] def enable_sync_output(self, enabled=True, channel=None): """Enable or disable SYNC output""" return self._set_channel_scpi_parameter("output_sync",enabled,channel=channel,result=True)
[docs] def get_load(self, channel=None): """Get the output load""" value=self._get_channel_scpi_parameter("output_load",channel=channel) if value.lower()=="def": return self._default_load if value.lower()=="inf": return self._inf_load return min(float(value),self._inf_load)
[docs] def set_load(self, load=None, channel=None): """Set the output load (``None`` means High-Z)""" load="INF" if load is None else load self._set_channel_scpi_parameter("output_load",str(load),channel=channel) return self.get_load(channel=channel)
[docs] def get_function(self, channel=None): """ Get output function. Can be one of the following: ``"sine"``, ``"square"``, ``"ramp"``, ``"pulse"``, ``"noise"``, ``"prbs"``, ``"DC"``, ``"user"``, ``"arb"``. Not all functions can be available, depending on the particular model of the generator. """ return self._get_channel_scpi_parameter("function",channel=channel)
[docs] def set_function(self, func, channel=None): """ Set output function. Can be one of the following: ``"sine"``, ``"square"``, ``"ramp"``, ``"pulse"``, ``"noise"``, ``"prbs"``, ``"DC"``, ``"user"``, ``"arb"``. Not all functions can be available, depending on the particular model of the generator. """ return self._set_channel_scpi_parameter("function",func,channel=channel,result=True)
[docs] def get_amplitude(self, channel=None): """Get output amplitude (i.e., half of the span)""" self._set_scpi_parameter("voltage_unit","VPP") return self._get_channel_scpi_parameter("amplitude",channel=channel)/2
[docs] def set_amplitude(self, amplitude, channel=None): """Set output amplitude (i.e., half of the span)""" self._set_scpi_parameter("voltage_unit","VPP") return self._set_channel_scpi_parameter("amplitude",amplitude*2,channel=channel,result=True)/2
[docs] def get_offset(self, channel=None): """Get output offset""" return self._get_channel_scpi_parameter("offset",channel=channel)
[docs] def set_offset(self, offset, channel=None): """Set output offset""" return self._set_channel_scpi_parameter("offset",offset,channel=channel,result=True)
[docs] def get_output_range(self, channel=None): """ Get output voltage range. Return tuple ``(vmin, vmax)`` with the low and high voltage values (i.e., ``offset-amplitude`` and ``offset+amplitude``). """ if self._range_mode=="high_low": low=self._ask_channel("VOLTAGE:LOW?","float",name="voltage",channel=channel) high=self._ask_channel("VOLTAGE:HIGH?","float",name="voltage",channel=channel) return low,high else: amp=self.get_amplitude(channel=channel) off=self.get_offset(channel=channel) return off-amp,off+amp
[docs] def set_output_range(self, rng, channel=None): """ Set output voltage range. If span is less than ``1E-4``, automatically switch to DC mode. """ try: low,high=min(rng),max(rng) except TypeError: low,high=rng,rng if abs(high-low)<1E-4 and "dc" in self._supported_functions: self.set_function("DC",channel=channel) self.set_amplitude(10E-3,channel=channel) self.set_offset((high+low)/2.,channel=channel) else: if self._range_mode in {"high_low","both"}: curr_rng=self.get_output_range(channel=channel) if low<curr_rng[1]: self._write_channel("VOLTAGE:LOW",low,"float",name="voltage",channel=channel) self._write_channel("VOLTAGE:HIGH",high,"float",name="voltage",channel=channel) else: self._write_channel("VOLTAGE:HIGH",high,"float",name="voltage",channel=channel) self._write_channel("VOLTAGE:LOW",low,"float",name="voltage",channel=channel) if self._range_mode in {"amp_off","both"}: amp,off=(rng[1]-rng[0])/2,(rng[1]+rng[0])/2 curr_amp=self.get_amplitude(channel=channel) if curr_amp>=amp: self.set_amplitude(amp,channel=channel) self.set_offset(off,channel=channel) else: self.set_offset(off,channel=channel) self.set_amplitude(amp,channel=channel) return self.get_output_range(channel=channel)
[docs] def get_frequency(self, channel=None): """Get output frequency""" value,unit=self._ask_channel("FREQUENCY?","value",name="frequency",channel=channel) return units.convert_frequency_units(value,unit or "Hz","Hz")
[docs] def set_frequency(self, frequency, channel=None): """Set output frequency""" self._write_channel("FREQUENCY",frequency,"float",name="frequency",channel=channel) return self.get_frequency(channel=channel)
[docs] def get_phase(self, channel=None): """Get output phase (in degrees)""" if self._channels_number==1: return None if self._set_angle_unit: self._set_scpi_parameter("phase_unit","DEG") fact=360/(2*np.pi) if self._default_angle_unit=="rad" else 1 return self._get_channel_scpi_parameter("phase",channel=channel)*fact
[docs] def set_phase(self, phase, channel=None): """Set output phase (in degrees)""" if self._channels_number==1: return None if self._set_angle_unit: self._set_scpi_parameter("phase_unit","DEG") fact=360/(2*np.pi) if self._default_angle_unit=="rad" else 1 return self._set_channel_scpi_parameter("phase",phase/fact,channel=channel,result=True)*fact
[docs] def sync_phase(self): """Synchronize phase between two channels""" if self._channels_number>1: self.write("SOURCE1:PHASE:SYNC")
[docs] def get_duty_cycle(self, channel=None): """ Get output duty cycle (in percent). Only applies to ``"square"`` output function. """ return self._get_channel_scpi_parameter("duty_cycle",channel=channel)
[docs] def set_duty_cycle(self, dcycle, channel=None): """ Set output duty cycle (in percent). Only applies to ``"square"`` output function. """ return self._set_channel_scpi_parameter("duty_cycle",dcycle,channel=channel,result=True)
[docs] def get_ramp_symmetry(self, channel=None): """ Get output ramp symmetry (in percent). Only applies to ``"ramp"`` output function. """ return self._get_channel_scpi_parameter("ramp_symmetry",channel=channel)
[docs] def set_ramp_symmetry(self, rsymm, channel=None): """ Set output ramp symmetry (in percent). Only applies to ``"ramp"`` output function. """ return self._set_channel_scpi_parameter("ramp_symmetry",rsymm,channel=channel,result=True)
[docs] def get_pulse_width(self, channel=None): """ Get output pulse width (in seconds). Only applies to ``"pulse"`` output function. """ return self._get_channel_scpi_parameter("pulse_width",channel=channel)
[docs] def set_pulse_width(self, width, channel=None): """ Set output pulse width (in seconds). Only applies to ``"pulse"`` output function. """ return self._set_channel_scpi_parameter("pulse_width",width,channel=channel,result=True)
[docs] def is_burst_enabled(self, channel=None): """Check if the burst mode is enabled""" return self._get_channel_scpi_parameter("burst_enabled",channel=channel)
[docs] def enable_burst(self, enabled=True, channel=None): """Enable burst mode""" return self._set_channel_scpi_parameter("burst_enabled",enabled,channel=channel,result=True)
[docs] def get_burst_mode(self, channel=None): """ Get burst mode. Can be either ``"trig"`` or ``"gate"``. """ return self._get_channel_scpi_parameter("burst_mode",channel=channel)
[docs] def set_burst_mode(self, mode, channel=None): """ Set burst mode. Can be either ``"trig"`` or ``"gate"``. """ return self._set_channel_scpi_parameter("burst_mode",mode,channel=channel,result=True)
[docs] def get_burst_ncycles(self, channel=None): """ Get burst mode ncycles. Infinite corresponds to a large value (>1E37). """ return self._ask_channel("BURST:NCYC?","int",name="burst_ncycles",channel=channel)
[docs] def set_burst_ncycles(self, ncycles=1, channel=None): """ Set burst mode ncycles. Infinite corresponds to ``None`` """ ncycles="INF" if ncycles is None or ncycles>1E37 else ncycles self._write_channel("BURST:NCYC",int(ncycles),"int",name="burst_ncycles",channel=channel) self.sleep(0.1) return self.get_burst_ncycles(channel=channel)
[docs] def get_gate_polarity(self, channel=None): """ Get burst gate polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._get_channel_scpi_parameter("gate_polarity",channel=channel)
[docs] def set_gate_polarity(self, polarity="norm", channel=None): """ Set burst gate polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._set_channel_scpi_parameter("gate_polarity",polarity,channel=channel,result=True)
[docs] def get_trigger_source(self, channel=None): """ Get trigger source. Can be either ``"imm"``, ``"ext"``, or ``"bus"``. """ return self._get_channel_scpi_parameter("trigger_source",channel=channel)
[docs] def set_trigger_source(self, src, channel=None): """ Set trigger source. Can be either ``"imm"``, ``"ext"``, or ``"bus"``. """ return self._set_channel_scpi_parameter("trigger_source",src,channel=channel,result=True)
[docs] def get_trigger_slope(self, channel=None): """ Get trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._get_channel_scpi_parameter("trigger_slope",channel=channel)
[docs] def set_trigger_slope(self, slope, channel=None): """ Set trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._set_channel_scpi_parameter("trigger_slope",slope,channel=channel,result=True)
[docs] def is_trigger_output_enabled(self, channel=None): """Check if the trigger output is enabled""" return self._get_channel_scpi_parameter("trigger_output",channel=channel)
[docs] def enable_trigger_output(self, enabled=True, channel=None): """Enable trigger output""" return self._set_channel_scpi_parameter("trigger_output",enabled,channel=channel,result=True)
[docs] def get_output_trigger_slope(self, channel=None): """ Get output trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._get_channel_scpi_parameter("output_trigger_slope",channel=channel)
[docs] def set_output_trigger_slope(self, slope, channel=None): """ Set output trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._set_channel_scpi_parameter("output_trigger_slope",slope,channel=channel,result=True)