Source code for pylablib.devices.Attocube.anc350

from ...core.devio import comm_backend, interface
from ...core.utils import general
from ...core.utils.strpack import unpack_int

import collections
import time
import struct

from ..interface import stage
from .base import AttocubeError, AttocubeBackendError


[docs] def get_usb_devices_number(): """Get the number of controllers connected via USB""" devs=comm_backend.PyUSBDeviceBackend.list_resources(idVendor=0x16C0,idProduct=0x055B) return len(devs)
[docs] class ANC350(comm_backend.ICommBackendWrapper,stage.IMultiaxisStage): """ Attocube ANC350 controller. Args: conn: connection parameters - index of the Attocube ANC350 in the system (for a single controller leave 0) timeout(float): default operation timeout """ Error=AttocubeError _axes=[0,1,2] def __init__(self, conn=0, timeout=5.): if isinstance(conn,int): conn=(0x16C0,0x055B,conn,0x86,0x02,"libusb0") # default device IDs instr=comm_backend.new_backend(conn,backend="pyusb",timeout=timeout,check_read_size=False,reraise_error=AttocubeBackendError) self._corr_number=0 self._tell_telegrams={} super().__init__(instr) self.open() with self._close_on_error(): self.instr.flush_read() self.instr.read(512) self.set_value(0x000A,0,0) # sync request self.get_hardware_id() self.enable_updates(False) self._add_info_variable("hardware_id",self.get_hardware_id) self._add_settings_variable("voltages",self.get_voltage,lambda v: self.set_voltage("all",v)) self._add_settings_variable("offsets",self.get_offset,lambda v: self.set_offset("all",v)) self._add_settings_variable("frequencies",self.get_frequency,lambda v: self.set_frequency("all",v)) self._add_status_variable("status",self.get_status) self._add_status_variable("positions",self.get_position) self._add_status_variable("target_positions",self.get_target_position) self._add_status_variable("sensor_voltage",self.get_sensor_voltage) self._add_status_variable("capacitance",lambda: self.get_capacitance(measure=False),priority=-5) def _make_telegram(self, opcode, address, index=0, data=b"", add_corr=True): data=data[:(len(data)//4)*4] l=16+len(data) if add_corr: self._corr_number=(self._corr_number%0xFFFF)+1 corr_number=self._corr_number else: corr_number=0 return struct.pack("<IIIII",l,opcode,address,index,corr_number)+data Telegram=collections.namedtuple("Telegram",["opcode","address","index","data","corr_number"]) def _parse_telegram(self, telegram): if len(telegram)<20: raise ValueError("data is too short: {}".format(len(telegram))) l,opcode,address,index,corr_number=struct.unpack("<IIIII",telegram[:20]) if len(telegram)!=l+4: raise ValueError("wrong telegram length: expected {}, got {}".format(l+4,len(telegram))) return self.Telegram(opcode,address,index,telegram[20:],corr_number) def _read_telegram(self, corr_number=None): ctd=general.Countdown(self.instr.timeout) while True: tg=self._parse_telegram(self.instr.read(512)) if tg.opcode==3: # ACK if corr_number is None or tg.corr_number==corr_number: return tg raise AttocubeError("got unexpected correlation number: {}, expected {}".format(tg.corr_number,corr_number)) elif tg.opcode==4: # TELL # if (tg.address>>8)!=0x0F: self._tell_telegrams[(tg.address,tg.index)]=tg else: raise AttocubeError("unexpected opcode: {}".format(tg.opcode)) if ctd.passed(): raise AttocubeError("timeout while read") Reply=collections.namedtuple("Reply",["address","index","reason","data"]) def _write(self, opcode, address, index=0, data=b""): if isinstance(data, int): data=struct.pack("<i",data) tg=self._make_telegram(opcode,address,index,data,add_corr=False) self.instr.write(tg) def _query(self, opcode, address, index=0, data=b""): if isinstance(data, int): data=struct.pack("<i",data) tg=self._make_telegram(opcode,address,index,data) self.instr.write(tg) resp=self._read_telegram(self._corr_number) reason,=struct.unpack("<I",resp.data[:4]) return self.Reply(resp.address,resp.index,reason,resp.data[4:])
[docs] def check_tell(self, timeout=0.01): """Check for queued TELL (periodic value update) commands""" try: with self.instr.using_timeout(timeout): self._read_telegram() except (AttocubeError,self.instr.Error): pass
def _check_reason(self, reason): if reason==1: raise AttocubeError("invalid address") if reason==2: raise AttocubeError("value out of range") if reason==3: raise AttocubeError("telegram was ignored") if reason==4: raise AttocubeError("verify of data failed") if reason==5: raise AttocubeError("wrong type of data") if reason==99: raise AttocubeError("unknown error") elif reason!=0: raise AttocubeError("unknown reason: {}".format(reason))
[docs] def set_value(self, address, index, value, ack=False): """ Set device value at the given address and index. If ``ack==True``, request ACK responds and return its value; otherwise, return immediately after set. """ if ack: resp=self._query(0,address,index,value) res=resp.data if isinstance(value,int): res=unpack_int(res,"<") self._check_reason(resp.reason) return res else: self._write(0,address,index,value)
[docs] def get_value(self, address, index, as_int=True): """ Get device value at the given address and index. If ``as_int==True``, convert the result into a signed integer; otherwise return raw byte string. """ resp=self._query(1,address,index) res=resp.data self._check_reason(resp.reason) if as_int: res=unpack_int(res,"<") return res
def _read_register(self, address): idx=0 res=b"" while True: data=self.get_value(address,idx,as_int=False) eoln_pos=data.find(b"\x00") if eoln_pos>=0: res+=data[:eoln_pos] return res res+=data idx+=1
[docs] def enable_updates(self, enabled=True): """Enable or disable periodic TELL updates""" self.set_value(0x0145,0,1 if enabled else 0)
[docs] def get_hardware_id(self): """Return device HWID (by default -1)""" return self.get_value(0x0168,0)
[docs] def set_hardware_id(self, hwid, persist=False): """ Set device HWID (can be used to identify different devices). If ``persist==True``, the value persists after power cycling. """ self.set_value(0x016A,0,hwid) if persist: self.set_value(0x016F,0,0x1234)
[docs] @stage.muxaxis @interface.use_parameters def is_connected(self, axis="all"): """Check if axis is connected""" return bool(self.get_value(0x3002,axis))
[docs] @stage.muxaxis @interface.use_parameters def is_enabled(self, axis="all"): """Check if axis is enabled""" return bool(self.get_value(0x3030,axis))
[docs] @stage.muxaxis(mux_argnames="enabled") @interface.use_parameters def enable_axis(self, axis="all", enabled=True): """Enable a specific axis or all axes""" self.set_value(0x3030,axis,1 if enabled else 0)
[docs] @stage.muxaxis @interface.use_parameters def disable_axis(self, axis="all"): """Disable a specific axis or all axes""" self.set_value(0x3030,axis,0)
[docs] @stage.muxaxis @interface.use_parameters def is_moving(self, axis="all"): """Move a given axis for a given number of steps""" return bool(self.get_value(0x302E,axis))
[docs] @stage.muxaxis @interface.use_parameters def check_limit(self, axis="all"): """ Check if the ent of travel has been reached. Return ``None`` if no limits are reached, ``"fwd"`` if forward limit is reached, ``"bwd"`` if backward limit is reached, or ``"both"`` if both are reached together (normally shouldn't happen). """ flim=self.get_value(0x3039,axis) blim=self.get_value(0x303A,axis) return [None,"fwd","bwd","both"][flim+blim*2]
[docs] @stage.muxaxis @interface.use_parameters def get_status_n(self, axis="all"): """ Get numerical status of the axis. For details, see ANC350 protocol. """ return self.get_value(0x0404,axis)
status_bits=[(0x001,"running"),(0x002,"limit"),(0x100,"sens_err"),(0x400,"sens_disconn"),(0x800,"ref_valid")]
[docs] @stage.muxaxis def get_status(self, axis="all"): """ Get device status. Return list of status strings, which can include ``"running"`` (axis is moving), ``"limit"`` (one of the limits is reached), ``"sens_err"`` (sensor error), ``"sens_disconn"`` (sensor disconnected), or ``"ref_valid"`` (reference is valid). """ status_n=self.get_status_n(axis=axis) return [s for (m,s) in self.status_bits if status_n&m]
[docs] @stage.muxaxis @interface.use_parameters def get_target_position(self, axis="all"): """Get the target position for the given axis (the position towards which it is moving)""" return self.get_value(0x0408,axis)*1E-9
[docs] @stage.muxaxis @interface.use_parameters def get_precision(self, axis="all"): """Get the axis precision in m (used for checking if the target is reached)""" return self.get_value(0x3036,axis)*1E-9
[docs] @stage.muxaxis(mux_argnames="precision") @interface.use_parameters def set_precision(self, axis="all", precision=1E-6): """Set the axis precision in m (used for checking if the target is reached)""" self.set_value(0x3036,axis,int(precision*1E9)) return self._wip.get_precision(axis)
[docs] @stage.muxaxis @interface.use_parameters def is_target_reached(self, axis="all", precision=None): """ Check if the target position is reached. If `precision` is not ``None``, it sets final position tolerance (in m). """ if precision is not None: self._wip.set_precision(axis,precision) self.set_value(0x3036,axis,int(precision*1E9)) return self.get_value(0x3037,axis)
[docs] def get_sensor_voltage(self): """Get position sensor voltage in Volts""" return self.get_value(0x0526,0)*1E-3
[docs] def set_sensor_voltage(self, voltage): """Set position sensor voltage in Volts""" self.set_value(0x0526,0,int(voltage*1E3)) return self.get_sensor_voltage()
[docs] @stage.muxaxis @interface.use_parameters def get_voltage(self, axis="all"): """Get axis step voltage in Volts""" return self.get_value(0x0400,axis)*1E-3
[docs] @stage.muxaxis(mux_argnames="voltage") @interface.use_parameters def set_voltage(self, axis, voltage): """Set axis step voltage in Volts""" self.set_value(0x0400,axis,int(voltage*1E3)) return self._wip.get_voltage(axis)
[docs] @stage.muxaxis @interface.use_parameters def get_offset(self, axis="all"): """Get axis offset voltage in Volts""" return self.get_value(0x0514,axis)*1E-3
[docs] @stage.muxaxis(mux_argnames="voltage") @interface.use_parameters def set_offset(self, axis, voltage): """Set axis offset voltage in Volts""" self.set_value(0x0514,axis,int(voltage*1E3)) return self._wip.get_offset(axis)
[docs] @stage.muxaxis @interface.use_parameters def get_frequency(self, axis="all"): """Get axis step frequency in Hz""" return self.get_value(0x0401,axis)
[docs] @stage.muxaxis(mux_argnames="freq") @interface.use_parameters def set_frequency(self, axis, freq): """Set axis step frequency in Hz""" self.set_value(0x0401,axis,int(freq)) return self._wip.get_frequency(axis)
[docs] @stage.muxaxis @interface.use_parameters def get_capacitance(self, axis="all", measure=False, delay=0.5): """ Get axis capacitance in F. If ``measure==True``, initialize the measurement and get the result after the measurement `delay`. Otherwise, return the last measured value. """ if measure: self.set_value(0x051E,axis,1) time.sleep(delay) return self.get_value(0x0569,axis)*1E-12
[docs] @stage.muxaxis @interface.use_parameters def get_position(self, axis="all"): """Get axis position (in m)""" return self.get_value(0x0415,axis)*1E-9
[docs] @interface.use_parameters def move_to(self, axis, position, precision=None): """ Move to target position (in m). If `precision` is not ``None``, it sets final position tolerance. """ self.set_value(0x0408,axis,int(position*1E9)) if precision is not None: self._wip.set_precision(axis,precision) self.set_value(0x040D,axis,1)
[docs] def move_by(self, axis, dist): """Move along a given axis by a given distance (in m)""" self.move_to(axis,self.get_position(axis)+dist)
[docs] @interface.use_parameters def move_by_steps(self, axis, steps=1, delay=0): """Move along a given axis by a given number of steps""" steps=int(steps) if steps>=0: tg=self._make_telegram(0,0x0410,axis,b"\x01\x00\x00\x00",add_corr=False) else: tg=self._make_telegram(0,0x0411,axis,b"\x01\x00\x00\x00",add_corr=False) steps=-steps for _ in range(steps): self.instr.write(tg) if delay: time.sleep(delay)
[docs] def wait_move(self, axis, precision=1E-6, timeout=10., period=0.01): """ Wait for a given axis to stop moving or to reach target position. If the motion is not finished after `timeout` seconds, raise a backend error. Precision sets the final positioning precision (in m). """ if axis=="all": for ax in self.get_all_axes(): self.wait_move(ax,timeout=timeout) return ctd=general.Countdown(timeout) while True: if (not self.is_moving(axis)) or self.is_target_reached(axis,precision=precision): return if ctd.passed(): raise AttocubeError("axis waiting timeout error") time.sleep(period)
[docs] @stage.muxaxis @interface.use_parameters def stop(self, axis="all"): """Stop motion of a given axis""" self.set_value(0x040E,axis,0)
[docs] @interface.use_parameters def jog(self, axis, direction): """ Jog a given axis in a given direction. `direction` can be either ``"-"`` (negative) or ``"+"`` (positive). The motion continues until it is explicitly stopped, or until a limit is hit. """ if direction: self.set_value(0x040E,axis,1) else: self.set_value(0x040F,axis,1)