Source code for pylablib.devices.Arcus.performax
from .ArcusPerformaxDriver_lib import wlib as lib, ArcusPerformaxLibError
from .base import ArcusError, ArcusBackendError
from ...core.utils import py3, general
from ...core.devio import interface, comm_backend
from ..interface import stage
import time
[docs]
def get_usb_device_info(devid):
"""
Get info for the given device index (starting from 0).
Return tuple ``(index, serial, model, desc, vid, pid)``.
"""
lib.initlib()
ndev=lib.fnPerformaxComGetNumDevices()
if devid>=ndev:
raise ArcusError("device with index {} doesn't exist; there are {} devices".format(devid,ndev))
return tuple([devid]+[py3.as_str(lib.fnPerformaxComGetProductString(devid,i)) for i in range(5)])
[docs]
def list_usb_performax_devices():
"""
List all performax devices.
Return list of tuples ``(index, serial, model, desc, vid, pid)``, one per device.
"""
lib.initlib()
ndev=lib.fnPerformaxComGetNumDevices()
return [get_usb_device_info(d) for d in range(ndev)]
[docs]
class GenericPerformaxStage(stage.IMultiaxisStage):
"""
Generic Arcus Performax translation stage.
Args:
idx(int): stage index; if using a USB connection, specifies a USB device index; if using RS485 connection, specifies device index on the bus
conn: if not ``None``, defines a connection to RS485 connection. Usually (e.g., for USB-to-RS485 adapters) this is a serial connection,
which either a name (e.g., ``"COM1"``), or a tuple ``(name, baudrate)`` (e.g., ``("COM1", 9600)``);
if `conn` is ``None``, assume direct USB connection and use the manufacturer-provided DLL
"""
_default_operation_cooldown=0.01
_axis_value_case="upper"
Error=ArcusError
def __init__(self, idx=0, conn=None):
super().__init__()
self._operation_cooldown=self._default_operation_cooldown
self.conn=conn
if conn is None:
lib.initlib()
self.instr=None
self._instr_conn=None
else:
self.instr=comm_backend.new_backend(conn,("auto","serial"),term_read="\r",term_write="\r",timeout=3.,datatype="str",
defaults={"serial":("COM1",9600),"network":("192.168.1.250",5001)},reraise_error=ArcusBackendError)
self._instr_conn=self.instr.conn
self.idx=idx
self.handle=None
self.open()
self._add_info_variable("device_info",self.get_device_info)
self._add_status_variable("device_number",self.get_device_number)
def _get_connection_parameters(self):
return self.idx,self._instr_conn
[docs]
def open(self):
"""Open the connection to the stage"""
if self.instr is not None:
self.instr.open()
else:
self.close()
lib.fnPerformaxComGetNumDevices() # sometimes needed to set up the dll
# lib.fnPerformaxComSetTimeouts(5000,5000)
for _ in range(5):
try:
self.handle=lib.fnPerformaxComOpen(self.idx)
lib.fnPerformaxComFlush(self.handle)
return
except ArcusPerformaxLibError:
time.sleep(0.3)
raise ArcusError("can't connect to the stage with index {}".format(self.idx))
[docs]
def close(self):
"""Close the connection to the stage"""
if self.instr is not None:
self.instr.close()
elif self.handle is not None:
for _ in range(5):
try:
lib.fnPerformaxComClose(self.handle)
self.handle=None
return
except ArcusPerformaxLibError:
time.sleep(0.3)
raise ArcusError("can't disconnect from the stage with index {}".format(self.idx))
[docs]
def is_opened(self):
if self.instr is not None:
return bool(self.instr)
return self.handle is not None
def _check_handle(self):
if self.handle is None:
raise ArcusError("device is not opened")
[docs]
def get_device_info(self):
"""Get the device info"""
if self.instr is not None:
return (self.idx,self.get_device_number())
return get_usb_device_info(self.idx)
[docs]
def query(self, comm):
"""Send a query to the stage and return the reply"""
if self.instr is not None:
if self.instr.get_backend_name()=="network":
self.instr.write(comm)
else:
self.instr.write("@{:02d}{}".format(self.idx,comm))
return self.instr.readline()
else:
self._check_handle()
time.sleep(self._operation_cooldown)
scomm=py3.as_builtin_bytes(comm)+b"\x00"
try:
reply=py3.as_str(lib.fnPerformaxComSendRecv(self.handle,scomm))
if reply.startswith("?"):
raise ArcusError("device returned error: {}".format(reply[1:]))
return reply
except ArcusPerformaxLibError:
raise ArcusError("error sending device query {}".format(comm))
[docs]
def get_device_number(self):
"""
Get the device number used in RS-485 communications.
Usually it is a string with the format similar to ``"4EX00"``.
"""
return self.query("DN")
[docs]
def set_device_number(self, number, store=True):
"""
Get the device number used in RS-485 communications.
`number` can be either a full device id (e.g., ``"4EX00"``), or a single number between 0 and 99.
In order for the change to take effect, the device needs to be power-cycled.
If ``store==True``, automatically store settings to the memory; otherwise, the settings will be lost
unless :meth:`store_defaults` is called at some point before the power-cycle.
"""
if isinstance(number,int):
number="{}{:02d}".format(self.get_device_number()[:-2],number)
self.query("DN={}".format(number))
if store:
self.store_defaults()
return self.get_device_number()
[docs]
def store_defaults(self):
"""
Store some of the settings to the memory as defaults.
Applies to device number, baudrate, limit error behavior, polarity, and some other settings.
"""
self.query("STORE")
[docs]
class Performax4EXStage(GenericPerformaxStage):
"""
Arcus Performax 4EX/4ET translation stage.
Args:
idx(int): stage index; if using a USB connection, specifies a USB device index; if using RS485 connection, specifies device index on the bus
conn: if not ``None``, defines a connection to RS485 connection. Usually (e.g., for USB-to-RS485 adapters) this is a serial connection,
which either a name (e.g., ``"COM1"``), or a tuple ``(name, baudrate)`` (e.g., ``("COM1", 9600)``);
if `conn` is ``None``, assume direct USB connection and use the manufacturer-provided DLL
enable: if ``True``, enable all axes on startup
"""
_axes=list("XYZU")
_speed_comm="HS"
_split_comms=False
_individual_home=False
_analog_inputs=range(1,9)
def __init__(self, idx=0, conn=None, enable=True):
super().__init__(idx=idx,conn=conn)
with self._close_on_error():
self.enable_absolute_mode()
if enable:
self.enable_axis()
self.enable_limit_errors(False)
self._add_status_variable("baudrate",self.get_baudrate)
self._add_settings_variable("limit_errors_enabled",self.limit_errors_enabled,self.enable_limit_errors)
self._add_status_variable("current_limit_errors",self.check_limit_error)
self._add_status_variable("position",self.get_position)
self._add_status_variable("encoder",self.get_encoder)
self._add_status_variable("current_speed",self.get_current_axis_speed)
self._add_settings_variable("enabled",self.is_enabled,lambda v: self.enable_axis("all",v))
self._add_settings_variable("global_speed",self.get_global_speed,self.set_global_speed)
self._add_settings_variable("axis_speed",self.get_axis_speed,lambda v: self.set_axis_speed("all",v))
self._add_status_variable("axis_status",self.get_status)
self._add_status_variable("moving",self.is_moving)
self._add_status_variable("digital_input",self.get_digital_input_register,priority=-2)
self._add_settings_variable("digital_output",self.get_digital_output_register,priority=-2)
self._add_settings_variable("analog_input",self.get_analog_input,priority=-2,mux=(self._analog_inputs,))
_p_baudrate=interface.EnumParameterClass("baudrate",{9600:1,19200:2,38400:3,57600:4,115200:5})
[docs]
@interface.use_parameters(_returns="baudrate")
def get_baudrate(self):
"""Get current baud rate"""
return int(self.query("DB"))
[docs]
@interface.use_parameters
def set_baudrate(self, baudrate, store=True):
"""
Set current baud rate.
Acceptable values are 9600 (default), 19200, 38400, 57600, and 115200.
In order for the change to take effect, the device needs to be power-cycled.
If ``store==True``, automatically store settings to the memory; otherwise, the settings will be lost
unless :meth:`store_defaults` is called at some point before the power-cycle.
"""
self.query("DB={}".format(baudrate))
if store:
self.store_defaults()
return self.get_baudrate()
[docs]
def enable_absolute_mode(self, enable=True):
"""Set absolute motion mode"""
self.query("ABS" if enable else "REL")
[docs]
def enable_limit_errors(self, enable=True, autoclear=True):
"""
Enable limit errors.
If on, reaching limit switch on an axis puts it into an error state, which immediately stops this an all other axes;
any further motion command on this axis will raise an error (it is still possible to restart motion on other axes);
the axis motion can only be resumed by calling :meth:`clear_limit_error`.
If off, the limited axis still stops, but the other axes are unaffected.
If ``autoclear==True`` and ``enable==False``, also clear the current limit errors on all exs.
"""
self.query("IERR={}".format(0 if enable else 1))
if not enable and autoclear:
self.clear_limit_error(axis="all")
[docs]
def limit_errors_enabled(self):
"""
Check if global limit errors are enabled.
If on, reaching limit switch on an axis puts it into an error state, which immediately stops this an all other axes;
any further motion command on this axis will raise an error (it is still possible to restart motion on other axes);
the axis motion can only be resumed by calling :meth:`clear_limit_error`.
If off, the limited axis still stops, but the other axes are unaffected.
"""
return not bool(int(self.query("IERR")))
def _axisn(self, axis):
return self._axes.index(axis)+1
[docs]
@stage.muxaxis
@interface.use_parameters
def is_enabled(self, axis="all"):
"""Check if the axis output is enabled"""
return bool(int(self.query("EO{}".format(self._axisn(axis)))))
[docs]
@stage.muxaxis(mux_argnames="enable")
@interface.use_parameters
def enable_axis(self, axis="all", enable=True):
"""
Enable axis output.
If the output is disabled, the steps are generated by the controller, but not sent to the motors.
"""
self.query("EO{}={}".format(self._axisn(axis),1 if enable else 0))
[docs]
@stage.muxaxis
@interface.use_parameters
def get_position(self, axis="all"):
"""Get the current axis pulse position"""
return int(self.query("P"+axis))
[docs]
@interface.use_parameters
def set_position_reference(self, axis, position=0):
"""
Set the current axis pulse position as a reference.
Re-calibrate the pulse position counter so that the current position is set as `position` (0 by default).
"""
self.query("P{}={:.0f}".format(axis,position))
[docs]
@stage.muxaxis
@interface.use_parameters
def get_encoder(self, axis="all"):
"""Get the current axis encoder value"""
return int(self.query("E"+axis))
[docs]
@interface.use_parameters
def set_encoder_reference(self, axis, position=0):
"""
Set the current axis encoder value as a reference.
Re-calibrate the encoder counter so that the current position is set as `position` (0 by default).
"""
self.query("E{}={:.0f}".format(axis,position))
[docs]
@interface.use_parameters
def move_to(self, axis, position):
"""Move a given axis to a given position"""
self.query("{}{:.0f}".format(axis,position))
[docs]
def move_by(self, axis, steps=1):
"""Move a given axis for a given number of steps"""
self.move_to(axis,self.get_position(axis)+steps)
[docs]
@interface.use_parameters
def jog(self, axis, direction):
"""
Jog a given axis in a given direction.
`direction` can be either ``"-"`` (negative) or ``"+"`` (positive).
The motion continues until it is explicitly stopped, or until a limit is hit.
"""
self.query("J{}{}".format(axis,"+" if direction else "-"))
[docs]
@stage.muxaxis
@interface.use_parameters
def stop(self, axis="all", immediate=False):
"""
Stop motion of a given axis.
If ``immediate==True`` make an abrupt stop; otherwise, slow down gradually.
"""
comm="ABORT" if immediate else "STOP"
self.query("{}{}".format(comm,axis))
_p_home_mode=interface.EnumParameterClass("home_mode",{"only_home_input":0,"only_limit_input":1,"home_and_zidx_input":2,"only_zidx_input":3,"only_home_input_lowspeed":4})
[docs]
@interface.use_parameters
def home(self, axis, direction, home_mode):
"""
Home the given axis using a given home mode.
`direction` can be ``"+"`` or ``"-"``
The mode can be ``"only_home_input"``, ``"only_home_input_lowspeed"``, ``"only_limit_input"``, ``"only_zidx_input"``, or ``"home_and_zidx_input"``.
For meaning, see Arcus PMX manual.
"""
if self._individual_home:
comm=["H","L","HZ","Z","HL"][home_mode]
self.query("{}{}{}".format(comm,axis,direction))
else:
self.query("H{}{}{}".format(axis,direction,home_mode))
[docs]
def get_global_speed(self):
"""Get the global speed setting (in Hz); overridden by a non-zero axis speed"""
return int(self.query(self._speed_comm))
[docs]
@stage.muxaxis
@interface.use_parameters
def get_axis_speed(self, axis="all"):
"""Get the individual axis speed setting (in Hz); 0 means that the global speed is used"""
return int(self.query(self._speed_comm+axis))
[docs]
def set_global_speed(self, speed):
"""Set the global speed setting (in Hz); overridden by a non-zero axis speed"""
self.query("{}={:.0f}".format(self._speed_comm,speed))
return self.get_global_speed()
[docs]
@stage.muxaxis(mux_argnames="speed")
@interface.use_parameters
def set_axis_speed(self, axis, speed):
"""Set the individual axis speed setting (in Hz); 0 means that the global speed is used"""
self.query("{}{}={:.0f}".format(self._speed_comm,axis,speed))
return self._wap.get_axis_speed(axis)
[docs]
@stage.muxaxis
@interface.use_parameters
def get_current_axis_speed(self, axis="all"):
"""Get the instantaneous speed (in Hz)"""
if self._split_comms:
return int(self.query("PS{}".format(axis)))
else:
speeds=[int(x) for x in self.query("PS").split(":") if x]
return speeds[self._axisn(axis)-1]
_status_bits={ "accel":0x001,"decel":0x002,"moving":0x004,
"alarm":0x008,
"sw_plus_lim":0x010,"sw_minus_lim":0x020,"sw_home":0x040,
"err_plus_lim":0x080,"err_minus_lim":0x100,"err_alarm":0x200,
"TOC_timeout":0x800}
def _get_full_status(self):
if self._split_comms:
return [int(self.query("MST{}".format(x))) for x in self._axes]
else:
stat=self.query("MST")
return [int(x) for x in stat.split(":") if x]
[docs]
@stage.muxaxis
@interface.use_parameters
def get_status_n(self, axis="all"):
"""Get the axis status as an integer"""
return self._get_full_status()[self._axisn(axis)-1]
[docs]
@stage.muxaxis
def get_status(self, axis="all"):
"""Get the axis status as a set of string descriptors"""
statn=self.get_status_n(axis)
return [ k for k in self._status_bits if self._status_bits[k]&statn ]
[docs]
@stage.muxaxis
def is_moving(self, axis="all"):
"""Check if a given axis is moving"""
return bool(self.get_status_n(axis)&0x007)
[docs]
def wait_move(self, axis, timeout=None, period=0.05):
"""Wait until motion is done"""
if axis=="all":
for ax in self._axes:
self.wait_move(ax,timeout=timeout,period=period)
return
ctd=general.Countdown(timeout)
while True:
if not self.is_moving(axis):
return
if ctd.passed():
raise ArcusError("waiting for motion on axis {} caused a timeout".format(axis))
time.sleep(period)
[docs]
@stage.muxaxis
def check_limit_error(self, axis="all"):
"""
Check if the axis hit limit errors.
Return ``""`` (not errors), ``"+"`` (positive limit error) or ``"-"`` (negative limit error).
"""
stat=self.get_status_n(axis)
err=""
if stat&self._status_bits["err_plus_lim"]:
err=err+"+"
if stat&self._status_bits["err_minus_lim"]:
err=err+"-"
return err
[docs]
@stage.muxaxis
@interface.use_parameters
def clear_limit_error(self, axis="all"):
"""Clear axis limit errors"""
self.query("CLR"+axis)
[docs]
def get_analog_input(self, channel):
"""Get voltage (in V) at a given input (starting with 1)"""
return int(self.query("AI{}".format(channel)))*1E-3
[docs]
def get_digital_input(self, channel):
"""Get value (0 or 1) at a given digital input (1 through 8)"""
return int(self.query("DI{}".format(channel)))
[docs]
def get_digital_input_register(self):
"""Get all 8 digital inputs as a single 8-bit integer"""
return int(self.query("DI"))
[docs]
def get_digital_output(self, channel):
"""Get value (0 or 1) at a given digital output (1 through 8)"""
return int(self.query("DO{}".format(channel)))
[docs]
def get_digital_output_register(self):
"""Get all 8 digital inputs as a single 8-bit integer"""
return int(self.query("DO"))
[docs]
def set_digital_output(self, channel, value):
"""Set value (0 or 1) at a given digital output (1 through 8)"""
self.query("DO{}={}".format(channel,1 if value else 0))
return self.get_digital_output(channel)
[docs]
def set_digital_output_register(self, value):
"""Set all 8 digital inputs as a single 8-bit integer"""
self.query("DO={}".format(int(value)))
return self.get_digital_output_register()
[docs]
class Performax2EXStage(Performax4EXStage):
"""
Arcus Performax 2EX/2ED translation stage.
Args:
idx(int): stage index; if using a USB connection, specifies a USB device index; if using RS485 connection, specifies device index on the bus
conn: if not ``None``, defines a connection to RS485 connection. Usually (e.g., for USB-to-RS485 adapters) this is a serial connection,
which either a name (e.g., ``"COM1"``), or a tuple ``(name, baudrate)`` (e.g., ``("COM1", 9600)``);
if `conn` is ``None``, assume direct USB connection and use the manufacturer-provided DLL
enable: if ``True``, enable all axes on startup
"""
_axes=list("XY")
_speed_comm="HSPD"
_split_comms=True
_individual_home=True
_analog_inputs=range(1,3)
[docs]
class PerformaxDMXJSAStage(GenericPerformaxStage):
"""
Arcus Performax DMX-J-SA translation stage.
Args:
idx(int): stage index; if using a USB connection, specifies a USB device index; if using RS485 connection, specifies device index on the bus
conn: if not ``None``, defines a connection to RS485 connection. Usually (e.g., for USB-to-RS485 adapters) this is a serial connection,
which either a name (e.g., ``"COM1"``), or a tuple ``(name, baudrate)`` (e.g., ``("COM1", 9600)``);
if `conn` is ``None``, assume direct USB connection and use the manufacturer-provided DLL
enable: if ``True``, enable all axes on startup
autoclear: if ``True``, automatically clear limit error before the motion start
"""
def __init__(self, idx=0, conn=None, enable=True, autoclear=True):
super().__init__(idx=idx,conn=conn)
with self._close_on_error():
self.enable_absolute_mode()
if enable:
self.enable_axis()
self.autoclear=autoclear
self._add_status_variable("current_limit_errors",self.check_limit_error)
self._add_status_variable("position",self.get_position)
self._add_settings_variable("enabled",self.is_enabled,self.enable_axis)
self._add_settings_variable("axis_speed",self.get_axis_speed,self.set_axis_speed)
self._add_status_variable("axis_status",self.get_status)
self._add_status_variable("moving",self.is_moving)
self._add_status_variable("digital_input",self.get_digital_input_register,priority=-2)
self._add_settings_variable("digital_output",self.get_digital_output_register,priority=-2)
[docs]
def enable_absolute_mode(self, enable=True):
"""Set absolute motion mode"""
self.query("ABS" if enable else "REL")
def _clearlim(self):
if self.autoclear:
self.clear_limit_error()
[docs]
@interface.use_parameters
def is_enabled(self):
"""Check if the output is enabled"""
return bool(int(self.query("EO")))
[docs]
@interface.use_parameters
def enable_axis(self, enable=True):
"""
Enable output.
If the output is disabled, the steps are generated by the controller, but not sent to the motors.
"""
self.query("EO={}".format(1 if enable else 0))
[docs]
@interface.use_parameters
def get_position(self):
"""Get the current pulse position"""
return int(self.query("PX"))
[docs]
@interface.use_parameters
def set_position_reference(self, position=0):
"""
Set the current pulse position as a reference.
Re-calibrate the pulse position counter so that the current position is set as `position` (0 by default).
"""
self.query("PX={:.0f}".format(position))
[docs]
@interface.use_parameters
def move_to(self, position):
"""Move to a given position"""
self._clearlim()
self.query("X{:.0f}".format(position))
[docs]
def move_by(self, steps=1):
"""Move for a given number of steps"""
self.move_to(self.get_position()+steps)
[docs]
@interface.use_parameters
def jog(self, direction):
"""
Jog in a given direction.
`direction` can be either ``"-"`` (negative) or ``"+"`` (positive).
The motion continues until it is explicitly stopped, or until a limit is hit.
"""
self._clearlim()
self.query("J{}".format("+" if direction else "-"))
[docs]
@interface.use_parameters
def stop(self, immediate=False):
"""
Stop motion.
If ``immediate==True`` make an abrupt stop; otherwise, slow down gradually.
"""
comm="ABORT" if immediate else "STOP"
self.query("{}".format(comm))
_p_home_mode=interface.EnumParameterClass("home_mode",{"only_home_input":0,"only_limit_input":1,"only_home_input_lowspeed":2})
[docs]
@interface.use_parameters
def home(self, direction, home_mode):
"""
Home using a given home mode.
`direction` can be ``"+"`` or ``"-"``
The mode can be ``"only_home_input"``, ``"only_home_input_lowspeed"``, or ``"only_limit_input"``.
For meaning, see Arcus PMX manual.
"""
comm=["H","L","HL"][home_mode]
self.query("{}{}".format(comm,direction))
[docs]
def set_axis_speed(self, speed):
"""Set the speed setting (in Hz)"""
self.query("HSPD={:.0f}".format(speed))
return self.get_axis_speed()
_status_bits={ "running":0x001,"accel":0x002,"decel":0x004,"sw_home":0x008,
"sw_minus_lim":0x010,"sw_plus_lim":0x020,
"err_plus_lim":0x040,"err_minus_lim":0x080,
"TOC_timeout":0x400}
[docs]
def get_status(self):
"""Get the status as a set of string descriptors"""
statn=self.get_status_n()
return [ k for k in self._status_bits if self._status_bits[k]&statn ]
[docs]
def wait_move(self, timeout=None, period=0.05):
"""Wait until motion is done"""
ctd=general.Countdown(timeout)
while True:
if not self.is_moving():
return
if ctd.passed():
raise ArcusError("waiting for motion caused a timeout")
time.sleep(period)
[docs]
def check_limit_error(self):
"""
Check if the motor hit limit errors.
Return ``""`` (not errors), ``"+"`` (positive limit error) or ``"-"`` (negative limit error).
"""
stat=self.get_status_n()
err=""
if stat&self._status_bits["err_plus_lim"]:
err=err+"+"
if stat&self._status_bits["err_minus_lim"]:
err=err+"-"
return err
[docs]
@interface.use_parameters
def clear_limit_error(self):
"""Clear limit error"""
self.query("CLR")
[docs]
def get_digital_input(self, channel):
"""Get value (0 or 1) at a given digital input (1 through 5)"""
return int(self.query("DI{}".format(channel)))
[docs]
def get_digital_input_register(self):
"""Get all 5 digital inputs as a single 5-bit integer"""
return int(self.query("DI"))
[docs]
def get_digital_output(self, channel):
"""Get value (0 or 1) at a given digital output (1 through 2)"""
return int(self.query("DO{}".format(channel)))
[docs]
def get_digital_output_register(self):
"""Get all 2 digital outputs as a single 2-bit integer"""
return int(self.query("DO"))
[docs]
def set_digital_output(self, channel, value):
"""Set value (0 or 1) at a given digital output (1 through 2)"""
self.query("DO{}={}".format(channel,1 if value else 0))
return self.get_digital_output(channel)
[docs]
def set_digital_output_register(self, value):
"""Set all 2 digital inputs as a single 2-bit integer"""
self.query("DO={}".format(int(value)))
return self.get_digital_output_register()