Source code for pylablib.devices.Leybold.base

from ...core.devio import comm_backend, interface

import collections
import struct



[docs] class LeyboldError(comm_backend.DeviceError): """Generic Leybold device error"""
[docs] class LeyboldBackendError(LeyboldError,comm_backend.DeviceBackendError): """Generic Leybold backend communication error"""
TDeviceInfo=collections.namedtuple("TDeviceInfo",["sensor","page","swver"]) TUpdateValue=collections.namedtuple("TUpdateValue",["value","display_units","status","error","device_info"])
[docs] class GenericITR(comm_backend.ICommBackendWrapper): """ Generic Leybold ITR pressure gauge. Args: conn: serial connection parameters (usually port or a tuple containing port and baudrate) """ Error=LeyboldError def __init__(self, conn): instr=comm_backend.new_backend(conn,"serial",term_read="",term_write="",defaults={"serial":("COM1",9600)},reraise_error=LeyboldBackendError) comm_backend.ICommBackendWrapper.__init__(self,instr) with self._close_on_error(): self.get_update() self._add_info_variable("device_info",self.get_device_info) self._add_status_variable("units",self.get_units) self._add_status_variable("pressure",self.get_pressure) self._add_status_variable("latest_update",self.get_update) _update_page=5 def _is_update(self, update, l0=None): if len(update)<2 or (l0 is not None and len(update)!=l0): return False if update[0]!=len(update)-2: return False if update[1]!=self._update_page: return False if sum(list(update[1:-1]))&0xFF!=update[-1]: return False return True _status_emission={0:"emission_off",1:"emission_25uA",2:"emission_5mA",3:"degas"} def _parse_status(self, status): return status def _parse_error(self, err): return err _p_units=interface.EnumParameterClass("units",{"mbar":0,"torr":1,"pa":2}) @interface.use_parameters(_returns="units") def _get_units(self, status): return (status>>4)&0x03 def _parse_value(self, value, units): if units=="mbar": return 10**(value/4E3-12.5)*100 elif units=="torr": return 10**(value/4E3-12.625)*133.322 return 10**(value/4E3-10.5) def _parse_update(self, update): l,page,status,err,value,swver,sensor,checksum=struct.unpack(">BBBBHBBB",update) el=len(update)-2 if l!=el: raise LeyboldError("declared length {} does not agree with the message length {}".format(l,el)) echecksum=sum(list(update[1:-1]))&0xFF if checksum!=echecksum: raise LeyboldError("declared checksum {} does not agree with the calculated checksum {}".format(checksum,echecksum)) device_info=TDeviceInfo(sensor,page,"{:.1f}".format(swver/20)) units=self._get_units(status) status=self._parse_status(status) err=self._parse_error(err) value=self._parse_value(value,units) return TUpdateValue(value,units,status,err,device_info) def _read_last_update(self): update=self.instr.read()[-9:] while True: if self._is_update(update,9): break update=(update+self.instr.read(1))[-9:] return update
[docs] def get_update(self, refresh=True): """ Get device state update. Return tuple ``(value, display_units, status, error, device_info)``, where ``value`` is the pressure in Pa, ``display_units`` are display units (``"pa"``, ``"mbar"``, or ``"torr"``), ``status`` is the devices status (e.g., emission status), ``error`` is the device error (``"ok"`` if no errors), and ``device_info`` is a tuple ``(sensor, page, swver)`` with the sensor kind ID, data page, and software version. If ``refresh==True``, get the latest update value; otherwise, get the latest read value. """ if refresh: update=self._read_last_update() update=self._parse_update(update) self._last_update=update return self._last_update
[docs] def send_command(self, byte1, byte2, byte3): """ Send command to the device. Arguments represent the three command bytes. Values of these bytes for different commands are described in the manual. """ bs=[b&0xFF for b in [byte1,byte2,byte3]] checksum=sum(bs)&0xFF msg=struct.pack("BBBBB",3,bs[0],bs[1],bs[2],checksum) self.instr.write(msg)
[docs] def get_device_info(self): """ Get device info. Return tuple ``(sensor, page, swver)`` with the sensor kind ID, data page, and software version. """ return self.get_update(refresh=False).device_info
[docs] def get_units(self): """Get device readout units (``"mbar"``, ``"pa"``, or ``"torr"``)""" return self.get_update().display_units
[docs] def get_pressure(self, display_units=False): """ Get pressure. If ``display_units==False``, return result in Pa; otherwise, use display units obtained using :meth:`get_units`. """ up=self.get_update() if display_units: factor={"pa":1,"mbar":100,"torr":133.322}[up.display_units] return up.value/factor return up.value
TITR90Status=collections.namedtuple("TITR90Status",["emission","atm_adj"])
[docs] class ITR90(GenericITR): """ Leybold ITR90 pressure gauge. Args: conn: serial connection parameters (usually port or a tuple containing port and baudrate) """ def _parse_status(self, status): return TITR90Status(self._status_emission[status&0x03],bool(status&0x04)) _error_values={0:"ok",5:"pirani_misadjusted",8:"BA_error",9:"pirani_error"} def _parse_error(self, err): return self._error_values[err&0x0F]
[docs] @interface.use_parameters def set_units(self, units, store=True): """ Get device readout units (``"mbar"``, ``"pa"``, or ``"torr"``). If ``store==True``, store the value in the non-volatile power-independent memory. """ self.send_command(16,62,units) if store: self.send_command(32,62,62) return self.get_units()
[docs] def start_degas(self): """Start degas (turns off automatically after 3 minutes)""" self.send_command(16,93,148)
[docs] def stop_degas(self): """Stop degas""" self.send_command(16,93,105)