from ...core.utils import py3
from ...core.devio import comm_backend, interface
import collections
[docs]
class PfeifferError(comm_backend.DeviceError):
"""Generic Pfeiffer device error"""
[docs]
class PfeifferBackendError(PfeifferError,comm_backend.DeviceBackendError):
"""Generic Pfeiffer backend communication error"""
TTPG260SwitchSettings=collections.namedtuple("TTPG260SwitchSettings",["channel","low_thresh","high_thresh"])
TTPG260GaugeControlSettings=collections.namedtuple("TTPG260GaugeControlSettings",["activation_control","deactivation_control","on_thresh","off_thresh"])
[docs]
class TPG260(comm_backend.ICommBackendWrapper):
"""
TPG260 series (TPG261/262) pressure gauge.
Args:
conn: serial connection parameters (usually port or a tuple containing port and baudrate)
"""
Error=PfeifferError
def __init__(self, conn):
instr=comm_backend.new_backend(conn,"serial",term_read="\r\n",term_write="",defaults={"serial":("COM1",9600)},reraise_error=PfeifferBackendError)
comm_backend.ICommBackendWrapper.__init__(self,instr)
gmux=([1,2],)
smux=([1,2],1)
self._add_status_variable("pressure",lambda channel: self.get_pressure(channel,status_error=False),ignore_error=(PfeifferError,),mux=gmux,priority=5)
self._add_status_variable("channel_status",self.get_channel_status,mux=gmux,priority=5)
self._add_status_variable("units",self.get_units)
self._add_status_variable("enabled",self.is_enabled,priority=2)
self._add_status_variable("switch_status",self.get_switch_status)
self._add_info_variable("gauge_kind",self.get_gauge_kind,mux=gmux)
self._add_settings_variable("display_channel",self.get_display_channel,self.set_display_channel)
self._add_settings_variable("measurement_filter",self.get_measurement_filter,self.set_measurement_filter,mux=smux)
self._add_settings_variable("calibration_factor",self.get_calibration_factor,self.set_calibration_factor,mux=smux,priority=-2)
self._add_status_variable("switch_settings",self.get_switch_settings,mux=([1,2,3,4],),priority=-2)
self._add_status_variable("gauge_control_settings",self.get_gauge_control_settings,mux=gmux,priority=-2)
try:
self.query("BAU")
except self.instr.Error:
self.close()
raise
[docs]
def comm(self, msg):
"""Send a command to the device"""
self.instr.write(msg+"\r\n")
rsp=self.instr.readline()
if len(rsp)==1:
if rsp[:1]==b"\x15":
raise PfeifferError("command '{}' resulted in negative acknowledgement from the device".format(msg))
elif rsp[:1]==b"\x06":
return
raise PfeifferError("command '{}' resulted in an unexpected acknowledgement from the device: {}".format(msg,rsp))
def _parse_value(self, value, data_type):
if data_type in ["str","raw"]:
return value
if data_type=="int":
return int(value)
if data_type=="float":
return float(value)
raise ValueError("unrecognized data type: {}".format(data_type))
[docs]
def query(self, msg, data_type="str"):
"""Send a query to the device and return the reply"""
self.comm(msg)
self.instr.write(b"\05")
res=py3.as_str(self.instr.readline())
if data_type=="raw":
return res
res=[v.strip() for v in res.strip().split(",")]
if not isinstance(data_type,(tuple,list)):
data_type=[data_type]*len(res)
if len(data_type)!=len(res):
raise ValueError("supplied datatypes {} have different length from the results {}".format(data_type,res))
res=[self._parse_value(v,dt) for (v,dt) in zip(res,data_type)]
return res[0] if len(res)==1 else res
_p_channel=interface.EnumParameterClass("channel",[1,2])
_p_unit=interface.EnumParameterClass("units",{"mbar":0,"torr":1,"pa":2})
[docs]
@interface.use_parameters(_returns="units")
def get_units(self):
"""Get device units for indication/reading (``"mbar"``, ``"torr"``, or ``"pa"``)"""
return self.query("UNI","int")
[docs]
@interface.use_parameters(_returns="units")
def set_units(self, units):
"""Set device units for indication/reading (``"mbar"``, ``"torr"``, or ``"pa"``)"""
return self.query("UNI, {}".format(units),"int")
[docs]
def to_Pa(self, value, units=None):
"""
Convert value in the given units to Pa.
If `units` is ``None``, use the current display units.
"""
units=units or self.get_units()
conv_factor={"mbar":1E2,"torr":133.322,"pa":1}
return value*conv_factor[units]
[docs]
def from_Pa(self, value, units=None):
"""
Convert value in the given units from Pa.
If `units` is ``None``, use the current display units.
"""
units=units or self.get_units()
conv_factor={"mbar":1E2,"torr":133.322,"pa":1}
return value/conv_factor[units]
[docs]
def get_display_channel(self):
"""Get controller display channel"""
return self.query("SCT","int")+1
[docs]
@interface.use_parameters
def set_display_channel(self, channel=1):
"""Set controller display channel"""
return self.query("SCT,{}".format(channel-1),"int")+1
[docs]
def get_display_resolution(self):
"""Get controller display resolution (number of digits)"""
return self.query("DCD","int")
[docs]
def set_display_resolution(self, resolution=2):
"""Set controller display resolution (number of digits)"""
return self.query("DCD,{}".format(resolution),"int")
_p_gauge_enabled=interface.EnumParameterClass("gauge_enabled",{None:0,False:1,True:2})
[docs]
@interface.use_parameters(_returns="gauge_enabled")
def is_enabled(self, channel=1):
"""
Check if the gauge at the given channel is enabled.
If the gauge cannot be turned on/off (e.g., not connected), return ``None``.
"""
return self.query("SEN",["int","int"])[channel-1]
[docs]
@interface.use_parameters(_returns="gauge_enabled")
def enable(self, enable=True, channel=1):
"""Enable or disable the gauge at the given channel"""
vals=[0,0]
vals[channel]=2 if enable else 1
return self.query("SEN,{},{}".format(*vals),["int","int"])
_p_gstat=interface.EnumParameterClass("gauge_status",{"ok":0,"under":1,"over":2,"sensor_error":3,"sensor_off":4,"no_sensor":5,"id_error":6})
[docs]
@interface.use_parameters(_returns="gauge_status")
def get_channel_status(self, channel=1):
"""
Get channel status.
Can be ``"ok"``, ``"under"`` (underrange), ``"over"`` (overrange), ``"sensor_error"``, ``"sensor_off"``, ``"no_sensor"``, or ``"id_error"``.
"""
return self.query("PR{}".format(channel),["int","float"])[0]
[docs]
@interface.use_parameters
def get_pressure(self, channel=1, display_units=False, status_error=True):
"""
Get pressure at a given channel.
If ``display_units==False``, return result in Pa; otherwise, use display units obtained using :meth:`get_units`.
If ``status_error==True`` and the channel status is not ``"ok"``, raise and error; otherwise, return ``None``.
"""
stat,press=self.query("PR{}".format(channel),["int","float"])
if stat!=0:
if status_error:
raise PfeifferError("pressure reading error: status {} ({})".format(stat,self._p_gstat.i(stat)))
else:
return None
if not display_units:
press=self.to_Pa(press)
return press
[docs]
@interface.use_parameters
def get_gauge_kind(self, channel=1):
return self.query("TID",["str","str"])[channel-1]
_p_filter=interface.EnumParameterClass("meas_filter",{"fast":0,"medium":1,"slow":2})
[docs]
@interface.use_parameters(_returns="meas_filter")
def get_measurement_filter(self, channel=1):
"""Get gauge measurement filter (``"fast"``, ``"medium"``, or ``"slow"``)"""
return self.query("FIL","int")[channel-1]
[docs]
@interface.use_parameters(_returns="meas_filter")
def set_measurement_filter(self, meas_filter, channel=1):
"""Set gauge measurement filter (``"fast"``, ``"medium"``, or ``"slow"``)"""
curr_filter=self.query("FIL","int")
curr_filter[channel-1]=meas_filter
return self.query("FIL,{},{}".format(*curr_filter),"int")[channel-1]
[docs]
@interface.use_parameters
def get_calibration_factor(self, channel=1):
"""Get gauge calibration factor"""
return self.query("CAL","float")[channel-1]
[docs]
@interface.use_parameters
def set_calibration_factor(self, coefficient, channel=1):
"""Set gauge calibration factor"""
curr_coefficient=self.query("CAL","float")
curr_coefficient[channel-1]=coefficient
return self.query("CAL,{},{}".format(*curr_coefficient),"float")[channel-1]
_p_switch_func=interface.RangeParameterClass("switch_function",1,4)
[docs]
@interface.use_parameters(_returns=["channel",None,None])
def get_switch_settings(self, switch_function):
"""
Get settings for the given switch function (between 1 and 4).
Return tuple ``(channel, low_thresh, high_thresh)``. The thresholds are given in Pa.
"""
ch,lt,ht=self.query("SP{}".format(switch_function),["int","float","float"])
units=self.get_units()
return TTPG260SwitchSettings(ch+1,self.to_Pa(lt,units),self.to_Pa(ht,units))
[docs]
@interface.use_parameters(_returns=["channel",None,None])
def setup_switch(self, switch_function, channel, low_thresh, high_thresh):
"""
Get settings for the given switch function (between 1 and 4).
Return tuple ``(channel, low_thresh, high_thresh)``. The thresholds are given in Pa.
"""
units=self.get_units()
low_thresh=self.from_Pa(low_thresh,units)
high_thresh=self.from_Pa(high_thresh,units)
ch,lt,ht=self.query("SP{},{},{},{}".format(switch_function,channel-1,low_thresh,high_thresh),["int","float","float"])
return TTPG260SwitchSettings(ch+1,self.to_Pa(lt,units),self.to_Pa(ht,units))
[docs]
def get_switch_status(self):
"""Return status of the 4 switch functions"""
return [bool(v) for v in self.query("SPS","int")]
_p_activation_control=interface.EnumParameterClass("activation_control",{"none":0,"auto":1,"manual":2,"external":3,"hot_start":4})
_p_deactivation_control=interface.EnumParameterClass("deactivation_control",{"none":0,"auto":1,"manual":2,"external":3,"self_control":4})
[docs]
@interface.use_parameters(_returns=["activation_control","deactivation_control",None,None])
def get_gauge_control_settings(self, channel):
"""
Get settings for the gauge control on the given channel.
Return tuple ``(activation_control, deactivation_control, on_thresh, off_thresh)``. The thresholds are given in Pa.
"""
acc,dacc,ont,offt=self.query("SC{}".format(channel),["int","int","float","float"])
units=self.get_units()
return TTPG260GaugeControlSettings(acc,dacc,self.to_Pa(ont,units),self.to_Pa(offt,units))
[docs]
@interface.use_parameters(_returns=["activation_control","deactivation_control",None,None])
def setup_gauge_control(self, channel, activation_control, deactivation_control, on_thresh, off_thresh):
"""
Setup gauge control on the given channel.
Return tuple ``(activation_control, deactivation_control, on_thresh, off_thresh)``. The thresholds are given in Pa.
"""
units=self.get_units()
on_thresh=self.from_Pa(on_thresh,units)
off_thresh=self.from_Pa(off_thresh,units)
acc,dacc,ont,offt=self.query("SC{},{},{},{},{}".format(channel,activation_control,deactivation_control,on_thresh,off_thresh),["int","int","float","float"])
return TTPG260GaugeControlSettings(acc,dacc,self.to_Pa(ont,units),self.to_Pa(offt,units))
def _parse_errors(self, errs):
if not isinstance(errs,list):
errs=[errs]
err_codes={0:"no_error",1:"watchdog",2:"task_fail",3:"eprom",4:"ram",5:"eeprom",6:"display",7:"adconv",
9:"gauge_1_err",10:"gauge_1_id_err",11:"gauge_2_err",12:"gauge-2_id_err"}
return [err_codes.get(er,er) for er in sorted(errs)]
[docs]
def get_current_errors(self):
"""
Get a list of all present error messages.
If there are no errors, return a single-element list ``["no_error"]``.
"""
return self._parse_errors(self.query("RES","int"))
[docs]
def reset_error(self):
"""
Cancel currently active errors and return to measurement mode.
Return the list of currently present errors.
If there are no errors, return a single-element list ``["no_error"]``.
"""
return self._parse_errors(self.query("RES,1","int"))
[docs]
class DPG202(comm_backend.ICommBackendWrapper):
"""
DPG202/TPG202 control unit.
Args:
conn: serial connection parameters (usually port or a tuple containing port and baudrate)
"""
Error=PfeifferError
def __init__(self, conn):
instr=comm_backend.new_backend(conn,"serial",term_read="\r",term_write="\r",datatype="str",defaults={"serial":("COM1",9600)},reraise_error=PfeifferBackendError)
comm_backend.ICommBackendWrapper.__init__(self,instr)
self._add_info_variable("device_name",self.get_device_name)
self._add_info_variable("software_version",self.get_software_version)
self._add_status_variable("last_error_code",self.get_error_code)
self._add_status_variable("pressure",self.get_pressure,ignore_error=(PfeifferError,),priority=5)
try:
self.get_device_name()
except self.instr.Error:
self.close()
raise
_data_types={"boolean_old":0,"u_integer":1,"u_real":2,"string":4,"boolean_new":6,"u_short_int":7,"u_expo_new":10,"long_string":11} # data type indices
_data_lengths={0:6,1:6,2:6,4:6,6:1,7:3,10:6,11:16} # data lengths
def _parse_telegram(self, telegram, data_type=None):
"""Parse a telegram string and return its content: parameters, value, action, and address"""
address,action,parameter,l,svalue,recv_checksum=int(telegram[:3]),int(telegram[3]),int(telegram[5:8]),int(telegram[8:10]),telegram[10:-3],int(telegram[-3:])
checksum=sum([ord(c) for c in telegram[:-3]])%0x100
if checksum!=recv_checksum:
raise PfeifferError("incorrect checksum of the telegram: expected {}, calculated {}".format(recv_checksum,checksum))
if l!=len(svalue):
raise PfeifferError("incorrect specified length of the value: declared {}, actual {}".format(l,len(svalue)))
if svalue=="NO_DEF":
raise PfeifferError("parameter does not exist")
elif svalue=="_RANGE":
raise PfeifferError("value is out of range")
elif svalue=="_LOGIC":
raise PfeifferError("logic access violation")
if data_type is None:
value=svalue
else:
data_type=self._data_types.get(data_type,data_type)
if data_type not in self._data_lengths:
raise PfeifferError("unknown data type: {}".format(data_type))
if l!=self._data_lengths[data_type]:
raise PfeifferError("received parameter length {} doesn't correspond to the expected length for this data type {}".format(l,self._data_lengths[data_type]))
if data_type in [0,6]: # boolean_old, boolean_new
value=bool(int(svalue))
elif data_type in [1,7]: # u_integer, u_short_int
value=int(svalue)
elif data_type==2: # u_real
value=int(svalue)/1E2
elif data_type in [4,11]: # string, long_string
value=svalue
elif data_type==10: # u_expo_new
value=int(svalue[:4])*10**(int(svalue[4:6])-23)
return parameter,value,action,address
def _build_value_string(self, value, data_type):
"""Convert the value into a string using the given data type"""
data_type=self._data_types.get(data_type,data_type)
if data_type not in self._data_lengths:
raise ValueError("unknown data type: {}".format(data_type))
l=self._data_lengths[data_type]
if data_type in [0,6]: # boolean_old, boolean_new
svalue=("1" if value else "0")*l
elif data_type==1: # u_integer
svalue="{:06d}".format(int(value))
elif data_type==7: # u_short_int
svalue="{:03d}".format(int(value))
elif data_type==2: # u_real
svalue="{:06d}".format(int(value*1E2))
elif data_type in [4,11]: # string, long_string
svalue=str(value).ljust(l)
elif data_type==10: # u_expo_new
sexp="{:.3e}".format(value)
m=int(sexp[0]+sexp[2:5])
e=int(sexp[sexp.find("e")+1:])
svalue="{:04d}{:02d}".format(m,e+20)
if len(svalue)!=l:
raise ValueError("string value '{}' has length {} instead of the expected length {}".format(svalue,len(svalue),l))
return svalue
def _build_telegram(self, parameter, value="=?", action=0, address=1):
"""Build a telegram string with the given content"""
telegram="{:03d}{:d}0{:03d}{:02d}{}".format(address,action,parameter,len(value),value)
checksum=sum([ord(c) for c in telegram])%0x100
return "{}{:03d}".format(telegram,checksum)
[docs]
def query(self, parameter, value="=?", action=0, address=1, send_type=None, recv_type=None):
"""
Send a query to the device and parse the reply.
Args:
parameter: parameter number
value: value to send (``"=?"`` for a value request)
action: request action (0 for value request, 1 for a command)
address: unit address
send_type: data type for the sent value (ignored for value requests)
recv_type: data type for the received value (``None`` means returning a raw string value)
"""
if value!="=?":
if send_type is None:
raise ValueError("send value type should be specified")
value=self._build_value_string(value,send_type)
req=self._build_telegram(parameter,value=value,action=action,address=address)
self.instr.write(req)
reply=self.instr.readline()
rparameter,rvalue,raction,raddress=self._parse_telegram(reply,data_type=recv_type)
if rparameter!=parameter:
raise PfeifferError("reply parameter {} is different from the request parameter {}".format(rparameter,parameter))
if raddress!=address:
raise PfeifferError("reply address {} is different from the request address {}".format(raddress,address))
if raction!=1:
raise PfeifferError("expected reply action 01, got {:02d}".format(raction))
return rvalue
[docs]
def get_value(self, parameter, data_type, address=1):
"""
Send a data request to the device.
Args:
parameter: parameter number
data_type: data type for the received value
address: unit address
"""
return self.query(parameter,recv_type=data_type,address=address)
[docs]
def comm(self, parameter, value, data_type, address=1):
"""
Send a control command to the device.
Args:
parameter: parameter number
value: associated command value
data_type: data type for the sent value
address: unit address
"""
return self.query(parameter,value=value,action=1,send_type=data_type,address=address)
[docs]
def get_pressure(self, address=1):
"""Get pressure at a given unit address"""
return self.get_value(740,"u_expo_new",address=address)*1E2
[docs]
def get_error_code(self, address=1):
"""Get the current error code at a given unit address"""
return self.get_value(303,"string",address=address)
[docs]
def get_software_version(self, address=1):
"""Get the software version at a given unit address"""
return self.get_value(312,"string",address=address)
[docs]
def get_device_name(self, address=1):
"""Get the name of the gauge at a given unit address"""
return self.get_value(349,"string",address=address)