from ...core.devio import interface, comm_backend
from ...core.utils import py3, general
from .base import ThorlabsError, ThorlabsBackendError
import contextlib
import collections
import time
[docs]
def muxaddr(*args, argname="addr", **kwargs):
"""Multiplex the function over its addr argument"""
if len(args)>0:
return muxaddr(**kwargs)(args[0])
def all_addr_func(self, *_, **__):
return self._addrs
def def_addr_func(self, *_, **__):
return self._default_addr
return general.muxcall(argname,special_args={"all":all_addr_func,None:def_addr_func},mux_argnames=kwargs.get("mux_argnames",None),return_kind="dict",allow_partial=True)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["serial_no","model_no","year","fw_ver","hw_ver","travel","pulse"])
TMotorInfo=collections.namedtuple("TMotorInfo",["loop","motor","current","ramp_up","ramp_down","fw_freq","bk_freq"])
[docs]
class ElliptecMotor(comm_backend.ICommBackendWrapper):
"""
Basic Elliptec stage device.
Args:
conn: serial connection parameters (usually port or a tuple containing port and baudrate)
addrs: list of device addresses (between 0 and 15) connected to this serial port; if ``"all"``, automatically detect all connected devices
default_addr: address used by default when not supplied; by default, use the first address among the connected
scale: scale of the position units to the internals units;
can be ``"stage"`` (use stage units such as mm or deg based on its internal calibration),
``"step"`` (directly use step units), or a number which multiplies user-supplied units to produce steps
timeout: default communication timeout
valid_status: status which are considered valid and do not raise an error on status check
"""
Error=ThorlabsError
def __init__(self, conn, addrs="all", default_addr=None, scale="stage", timeout=3., valid_status=("ok","mech_timeout")):
defaults={"serial":{"baudrate":9600}}
instr=comm_backend.new_backend(conn,backend=("auto","serial"),term_write=b"",term_read=b"\r\n",timeout=timeout,
defaults=defaults,reraise_error=ThorlabsBackendError)
instr.setup_cooldown(write=0.01)
super().__init__(instr)
self._bg_msg_counters={}
self.add_background_comm("BO")
self.add_background_comm("BS")
self._valid_status=valid_status
with self._close_on_error():
self._model_no={}
self._stage_scale={}
self._addrs=self._detect_devices(addrs)
if not self._addrs:
raise self.Error("could not detect any connected devices among the ones specified ({})".format(addrs))
for a in self._addrs:
dev_info=self.get_device_info(addr=a)
self._model_no[a]=dev_info.model_no
self._stage_scale[a]=dev_info.pulse/dev_info.travel
self._default_addr=self._addrs[0] if default_addr is None else default_addr
if not (scale in ["stage","step"] or isinstance(scale,(int,float))):
raise ValueError("scale can be 'stage', 'step', or a number converting step units into the desired user units; suppleid value is {}".format(scale))
self._scale={a:scale for a in self._addrs}
self._add_info_variable("device_info",lambda: self.get_device_info(addr="all"))
self._add_info_variable("addrs",lambda: self._addrs)
self._add_info_variable("scale",lambda: self.get_scale(addr="all"))
self._add_status_variable("status",lambda: self.get_status(addr="all"))
self._add_status_variable("position",lambda: self.get_position(addr="all"))
self._add_status_variable("velocity",lambda: self.get_velocity(addr="all"))
self._add_status_variable("motor_info",lambda: self._get_all_motor_info(addr="all"))
_pre_move_delay=0.1
def _detect_devices(self, addrs="all", timeout=0.5, delay=0.1):
if addrs=="all":
addrs=list(range(16))
for a in addrs:
self.send_comm("gs",addr=a)
time.sleep(delay)
valid=set()
with self.instr.using_timeout(timeout=timeout):
try:
for _ in range(len(addrs)):
comm=self.recv_comm()
valid.add(comm.addr)
except self.Error:
pass
return sorted(valid)
[docs]
def get_connected_addrs(self):
"""Get a list of all connected device addresses"""
return list(self._addrs)
def _change_addr(self, addr, newaddr):
if addr in self._addrs:
self._addrs[self._addrs.index(addr)]=newaddr
if self._default_addr==addr:
self._default_addr=newaddr
for d in [self._model_no,self._stage_scale,self._scale]:
d[newaddr]=d.pop(addr)
[docs]
def get_default_addr(self):
"""Get the current default address"""
return self._default_addr
[docs]
def set_default_addr(self, addr):
"""Set the current default address"""
self._default_addr=addr
def _get_addr(self, addr):
return self._default_addr if addr is None else addr
[docs]
@contextlib.contextmanager
def using_default_addr(self, addr):
"""Context manager which temporarily changes the default address"""
curr_addr,self._default_addr=self._default_addr,addr
try:
yield
finally:
self._default_addr=curr_addr
[docs]
def send_comm(self, comm, data="", addr=None):
"""
Send a message with the given data to the devices at a given address.
For details, see ELLx communications protocol.
"""
msg="{:01X}{}{}".format(self._get_addr(addr),comm,data)
self.instr.write(msg)
CommData=collections.namedtuple("CommData",["comm","data","addr"])
_default_data_lengths={"IN":30,"GS":2,"I1":22,"I2":22,"I3":22,"PO":8,"HO":8,"GJ":8,"GV":2,"BS":2,"BO":8}
def _read_single_comm(self, datalen="auto", timeout=None):
with self.instr.using_timeout(timeout):
header=self.instr.read(3)
haddr=int(header[0:1],16)
hcomm=py3.as_str(header[1:3])
bgcomm=hcomm in self._bg_msg_counters
data=self.instr.readline()
if (datalen=="auto") or bgcomm:
datalen=self._default_data_lengths.get(hcomm,None)
if datalen is not None and len(data)!=datalen:
raise ThorlabsError("unexpected reply data length for command {}: expected {}, got {}".format(hcomm,datalen,len(data)))
return self.CommData(hcomm,data,haddr),bgcomm
[docs]
def recv_comm(self, comm=None, addr=None, datalen="auto", timeout=None):
"""
Receive a message.
`comm`, `addr`, and `datalen` can specify the expected return command, address, or the length of the data field
(if ``"auto"``, determine based on the return command).
`timeout` specifies the waiting timeout (by default, same as supplied upon the device connection).
For details, see ELLx communications protocol.
"""
while True:
reply,bgcomm=self._read_single_comm(datalen=datalen, timeout=timeout)
if bgcomm:
cnt,_=self._bg_msg_counters[reply.comm].get(reply.addr,(0,None))
self._bg_msg_counters[reply.comm][reply.addr]=cnt+1,reply
else:
if addr is not None and reply.addr!=addr:
raise ThorlabsError("unexpected reply address: expected {:01X}, got {:01X}".format(addr,reply.addr))
if comm is not None:
if not isinstance(comm,(list,tuple,set)):
comm=[comm]
if reply.comm not in comm:
raise ThorlabsError("unexpected reply command: expected {}, got {}".format(comm,reply.comm))
return reply
[docs]
def query(self, comm, data="", addr=None, reply_comm=None, reply_addr="auto", reply_datalen="auto", timeout=None):
"""
Send a query to the device and receive the reply.
A combination of :meth:`send_comm` and :meth:`recv_comm`.
"""
addr=self._get_addr(addr)
self.instr.read()
self.send_comm(comm,data=data,addr=addr)
if reply_addr=="auto":
reply_addr=addr
return self.recv_comm(comm=reply_comm,addr=reply_addr,datalen=reply_datalen,timeout=timeout)
[docs]
def add_background_comm(self, comm):
"""
Mark given `comm` as a 'background' message, which can be sent by the device at any point without prompt (e.g., some operation confirmation).
If it is received instead during ``recv_comm`` or ``query`` operations, it is ignored, and the corresponding counter is increased.
"""
self._bg_msg_counters.setdefault(comm,{})
[docs]
def check_background_comm(self, comm, addr=None):
"""Return message counter and the last message value (``None`` if not message received yet) of a given 'background' message received from the given address"""
return self._bg_msg_counters[comm].get(addr,(0,None))
[docs]
@muxaddr
def change_addr(self, newaddr, addr=None):
"""Change the device address to a new value (between 0 and 15)"""
reply=self.query("ca",data="{:01X}".format(newaddr),addr=addr,reply_comm="GS",reply_addr=newaddr)
result=self._check_status_reply(reply)
self._change_addr(addr,newaddr)
return result
[docs]
@muxaddr
def store_parameters(self, addr=None):
"""Store current device parameters (e.g., frequencies) to the energy-independent memory"""
reply=self.query("us",addr=addr,reply_comm="GS")
return self._check_status_reply(reply)
[docs]
@muxaddr
def get_device_info(self, addr=None):
"""
Get device info.
Return tuple ``(serial_no, model_no, year, fw_ver, hw_ver, travel, pulse)``.
"""
reply=self.query("in",addr=addr,reply_comm="IN")
model_no=int(reply.data[:2],16)
serial_no=py3.as_str(reply.data[2:10])
year=int(reply.data[10:14])
fw_ver=int(reply.data[14:16],16)
hw_ver=int(reply.data[16:18],16)
travel=int(reply.data[18:22],16)
pulse=int(reply.data[22:30],16)
return TDeviceInfo(serial_no,model_no,year,fw_ver,hw_ver,travel,pulse)
_status_codes={0:"ok",1:"comm_timeout",2:"mech_timeout",3:"not_supported",4:"value_out_of_range",
5:"isolated",6:"out_of_isolation",7:"init_error",8:"therm_error",9:"busy",10:"sens_error",11:"motor_error",12:"out_of_range",13:"overcurrent"}
def _parse_status(self, status, check=True, clear_status=True):
status=self._status_codes.get(status,"reserved")
if check and status not in self._valid_status:
if clear_status:
self.query("gs") # clear the fault status (otherwise it is returned as the next status result)
raise ThorlabsError("faulty status: {}".format(status))
return status
def _check_status_reply(self, reply, check=True, clear_status=True):
if reply.comm!="GS":
return
return self._parse_status(int(reply.data,16),check=check,clear_status=clear_status)
[docs]
@muxaddr
def get_status(self, addr=None):
"""Get device status"""
reply=self.query("gs",addr=addr,reply_comm="GS",timeout=10)
return self._check_status_reply(reply,check=False)
_p_motor=interface.EnumParameterClass("motor",[1,2,3,"all"])
@muxaddr
def _get_all_motor_info(self, addr=None):
info=[]
for m in range(1,4):
try:
info.append(self.get_motor_info(motor=m,addr=addr))
except ThorlabsError:
pass
return info
[docs]
@muxaddr
@interface.use_parameters
def get_motor_info(self, motor=1, addr=None):
"""
Get info for a given motor (between 1 and 3).
Return tuple ``(loop_ena, motor_ena, current, ramp_up, ramp_down, fw_freq, bk_freq)``.
"""
comm="i{}".format(motor)
reply=self.query(comm,addr=addr,reply_comm=[comm.upper(),"GS"])
self._check_status_reply(reply)
loop_ena=bool(int(reply.data[0]))
motor_ena=bool(int(reply.data[1]))
current=int(reply.data[2:6],16)/1866
ramp_up=int(reply.data[6:10],16)
ramp_down=int(reply.data[10:14],16)
fw_freq=14740000/int(reply.data[14:18],16)
bk_freq=14740000/int(reply.data[18:22],16)
return TMotorInfo(loop_ena,motor_ena,current,ramp_up,ramp_down,fw_freq,bk_freq)
[docs]
@muxaddr
def get_scale(self, addr=None):
"""
Get scale parameter for the specified address.
Can be ``"stage"``, ``"step"``, or a proportionality coefficient.
"""
return self._scale[addr]
[docs]
@muxaddr
def set_scale(self, scale, addr=None):
"""
Set scale parameter for the specified address.
Can be ``"stage"``, ``"step"``, or a proportionality coefficient.
"""
if not (scale in ["stage","step"] or isinstance(scale,(int,float))):
raise ValueError("scale can be 'stage', 'step', or a number converting step units into the desired user units; suppleid value is {}".format(scale))
self._scale[addr]=scale
return self._scale[addr]
def _to_steps_data(self, v, addr):
addr=self._get_addr(addr)
if self._scale[addr]=="stage":
v=v*self._stage_scale[addr]
elif self._scale[addr]!="step":
v=v*self._scale[addr]
v=int(v)%2**32
return "{:08X}".format(v)
def _from_steps_data(self, v, addr):
addr=self._get_addr(addr)
v=int(v,16)
v=(v+2**31)%2**32-2**31
if self._scale[addr]=="stage":
return v/self._stage_scale[addr]
if self._scale[addr]!="step":
return v/self._scale[addr]
return v
_p_home_dir=interface.EnumParameterClass("home_dir",{"cw":0,"ccw":1})
[docs]
@muxaddr
@interface.use_parameters
def home(self, home_dir="cw", paddles="all", addr=None):
"""
Home the device.
The operation is synchronous, i.e., it will not finish until the homing is done.
If the device is a rotary stage, then `home_dir` specifies homing direction (``"cw"`` or ``"ccw"``).
If the device is a paddle polarization controller, then `paddles` is a list of all paddle indices (1 to 3) to home (``"all"`` is the same as ``[1,2,3]``).
"""
if self._model_no[addr]==3:
paddles=[1,2,3] if paddles=="all" else [p for p in paddles if 1<=p<=3]
data="{:01X}".format(sum(1<<(p-1) for p in paddles))
else:
data="{:01X}".format(home_dir)
reply=self.query("ho",data=data,addr=addr,reply_comm=["GS","PO"])
self._check_status_reply(reply)
return reply.comm=="PO"
[docs]
@muxaddr
def get_home_offset(self, addr=None):
"""Get homing offset"""
reply=self.query("go",addr=addr,reply_comm="HO")
return self._from_steps_data(reply.data,addr)
[docs]
@muxaddr
def set_home_offset(self, offset, addr=None):
"""Set homing offset (note: the manufacturer advises against it)"""
reply=self.query("so",data=self._to_steps_data(offset,addr),addr=addr,reply_comm="GS")
self._check_status_reply(reply)
return self.get_home_offset(addr=addr)
[docs]
@muxaddr
def get_velocity(self, addr=None):
"""Get velocity as a percentage from the maximal velocity (0 to 100)"""
reply=self.query("gv",addr=addr,reply_comm="GV")
return int(reply.data,16)
[docs]
@muxaddr
def set_velocity(self, velocity=100, addr=None):
"""Set velocity as a percentage from the maximal velocity (0 to 100)"""
reply=self.query("sv",data="{:02X}".format(int(velocity)),addr=addr,reply_comm="GS")
self._check_status_reply(reply)
return self.get_velocity(addr=addr)
[docs]
@muxaddr
def get_frequency(self, motor=1, addr=None):
"""Get frequencies at a given motor as a tuple ``(fw_freq, bk_freq)``"""
mi=self.get_motor_info(motor=motor,addr=addr)
return mi.fw_freq,mi.bk_freq
[docs]
@muxaddr
@interface.use_parameters
def set_frequency(self, fw_freq=None, bk_freq=None, motor=1, addr=None):
"""
Set frequencies at a given motor.
Values set as ``None`` stay the same.
"""
for pfx,freq in [("f",fw_freq),("b",bk_freq)]:
if freq is not None:
comm="{}{}".format(pfx,motor)
reply=self.query(comm,data="{:04X}".format(int(round(14740000/freq))),addr=addr,reply_comm="GS")
self._check_status_reply(reply)
return self.get_frequency(motor=motor,addr=addr)
[docs]
@muxaddr
@interface.use_parameters
def search_frequency(self, motor=1, addr=None):
"""
Run the automated frequency search on a given motor.
The position might change slightly throughout the process.
"""
comm="s{}".format(motor)
reply=self.query(comm,addr=addr,reply_comm="GS",timeout=10)
self._check_status_reply(reply)
return self.get_frequency(motor=motor,addr=addr)
[docs]
@muxaddr
def get_position(self, addr=None):
"""Get the current position"""
reply=self.query("gp",addr=addr,reply_comm="PO")
return self._from_steps_data(reply.data,addr)
[docs]
@muxaddr
def move_to(self, position, addr=None, timeout=30.):
"""
Move to the given position.
The operation is synchronous, i.e., it will not finish until the motion is stopped.
Return ``True`` if the position was reached successfully or ``False`` otherwise.
"""
time.sleep(self._pre_move_delay) # if the move command is issued to soon after a previous one, it can move to 0 instead
reply=self.query("ma",data=self._to_steps_data(position,addr),addr=addr,reply_comm=["GS","PO"],timeout=timeout)
self._check_status_reply(reply)
return reply.comm=="PO"
[docs]
@muxaddr
def move_by(self, distance, addr=None, timeout=30.):
"""
Move by the given distance.
The operation is synchronous, i.e., it will not finish until the motion is stopped.
Return ``True`` if the position was reached successfully or ``False`` otherwise.
"""
time.sleep(self._pre_move_delay) # if the move command is issued to soon after a previous one, it can move to 0 instead
reply=self.query("mr",data=self._to_steps_data(distance,addr),addr=addr,reply_comm=["GS","PO"],timeout=timeout)
self._check_status_reply(reply)
return reply.comm=="PO"