from ...core.devio import comm_backend, interface
from ...core.utils import general, py3
from ..interface import stage
from .base import NewportError, NewportBackendError
import time
import collections
[docs]def get_usb_devices_number():
"""Get the number of controllers connected via USB"""
devs=comm_backend.PyUSBDeviceBackend.list_resources(idVendor=0x104D,idProduct=0x4000)
return len(devs)
[docs]def muxaddr(*args, **kwargs):
"""Multiplex the function over its addr argument"""
if len(args)>0:
return muxaddr(**kwargs)(args[0])
def addr_func(self, *_, **__):
return self._addr_list
return general.muxcall("addr",special_args={"all":addr_func},mux_argnames=kwargs.get("mux_argnames",None),return_kind=kwargs.get("return_kind","dict"),allow_partial=True)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["id"])
[docs]class Picomotor8742(comm_backend.ICommBackendWrapper,stage.IMultiaxisStage):
"""
Picomotor 8742 4-axis controller.
Args:
conn: connection parameters; can be an index (starting from 0) for USB devices,
or an IP address (e.g., ``"192.168.0.2"``) or host name (e.g., ``"8742-12345"``) for Ethernet devices
backend: communication backend; by default, try to determine from the communication parameters
timeout(float): default operation timeout
multiaddr: if ``True``, assume that there are several daisy-chained devices connected to the current one;
in this case, ``get_device_info`` and related methods return dictionaries ``{addr: value}`` for all connected controllers
instead of simply values for the given controller
scan: if ``True`` and ``multiaddr==True``, scan for all connected devices (call :meth:`scan_devices`) upon connection
"""
Error=NewportError
_axes=[1,2,3,4]
def __init__(self, conn=0, backend="auto", timeout=5., multiaddr=False, scan=True):
if isinstance(conn,int):
conn=(0x104D,0x4000,conn,0x81,0x02) # default device IDs
backend="pyusb"
backend=comm_backend.autodetect_backend(conn,default="network") if backend=="auto" else backend
defaults={"network":{"port":23}}
term_write="\n\r" if comm_backend.autodetect_backend(conn,"network")=="pyusb" else "\n"
instr=comm_backend.new_backend(conn,backend=backend,timeout=timeout,term_write=term_write,term_read="\n",defaults=defaults,reraise_error=NewportBackendError)
time.sleep(0.01)
instr.flush_read()
self._addr_list=None
self._multiaddr=multiaddr
super().__init__(instr)
try:
self.get_id()
except instr.Error:
self.close()
raise NewportError("error connecting to the Picomotor controller")
if multiaddr:
if scan:
self.scan_devices()
self.get_addr_map()
self._add_info_variable("device_info",self.get_device_info)
self._add_status_variable("ethernet_parameters",self.get_ethernet_parameters,priority=-2)
self._add_status_variable("addr_map",self.get_addr_map)
self._add_status_variable("motor_type",lambda: self.get_motor_type(addr="all"))
self._add_status_variable("position",lambda: self.get_position(addr="all"))
self._add_settings_variable("velocity_parameters",lambda: self.get_velocity_parameters(addr="all"),self._setup_all_velocity)
self._add_status_variable("moving",lambda: self.is_moving(addr="all"))
[docs] def query(self, comm, axis=None, addr=None, read_reply=None):
if axis is not None:
comm="{}{}".format(axis,comm)
if addr is not None:
addr_header="{}>".format(addr)
comm=addr_header+comm
self.instr.write(comm)
if read_reply is None:
read_reply=comm.endswith("?")
if read_reply:
reply=py3.as_str(self.instr.readline().strip())
if addr is not None:
if not reply.startswith(addr_header):
raise NewportError("unexpected reply: expected reply starting with '{}', got '{}'".format(addr_header,reply))
reply=reply[len(addr_header):]
return reply
[docs] @muxaddr
def get_id(self, addr=None):
"""Get the device identification string"""
return self.query("*IDN?",addr=addr)
[docs] @muxaddr
def get_device_info(self, addr=None):
"""Get the device info of the controller board: ``(id_string,)``"""
return TDeviceInfo(self.get_id(addr=addr))
[docs] @muxaddr
def reset(self, addr=None):
"""
Restart the device.
Reboots the CPU and restores all saved settings from the parameter memory.
"""
self.query("*RST",addr=addr)
time.sleep(5.)
return self.get_id(addr=addr)
[docs] @muxaddr
def save_parameters(self, addr=None):
"""
Store current parameters to the non-volatile memory.
Affects axes speed and acceleration, motor types, and Ethernet parameters.
"""
self.query("SM",addr=addr)
_p_param_src=interface.EnumParameterClass("param_src",{"factory":0,"memory":1})
[docs] @muxaddr
@interface.use_parameters(src="param_src")
def restore_parameters(self, src="memory", addr=None):
"""
Restore parameters from the non-volatile memory (if ``src=="memory"``) for factory parameters (if ``src=="factory"``).
Affects axes speed and acceleration, motor types, and Ethernet parameters.
"""
self.query("*RCL{}".format(src),addr=addr)
_p_reassign=interface.EnumParameterClass("reassign",{"none":0,"conflict":1,"all":2})
[docs] @interface.use_parameters
def scan_devices(self, reassign="conflict", sync=True):
"""
Scan for devices connected to the current host device via RS-485 daisy-chaining.
`reassign` controls how device addresses are assigned during the scan;
can be ``"none"`` (keep current values; can lead to conflicts if several devices have the same address),
``"conflict"`` (change conflicting addresses), or ``"all"`` (assigned all new addresses in sequence starting from the host)
If ``sync==True``, wait until the scan is done (might take several seconds).
"""
self.query("SC{}".format(reassign))
if sync:
self.wait_for_scan()
return self.get_addr_map()
[docs] def get_addr_map(self):
"""
Get address map for devices connected to the current host device via RS-485 daisy-chaining.
Return tuple ``(addresses, conflict)``, where ``addresses`` is the list of all device addresses,
and ``conflict==True`` if there address conflicts (several devices having the same address).
"""
value=int(self.query("SC?"))
addr_list=[addr for addr in range(1,32) if value&(1<<addr)]
if self._multiaddr:
self._addr_list=addr_list
return addr_list, bool(value%1)
[docs] def wait_for_scan(self, timeout=10.):
"""Wait for the device connection scan to finish"""
ctd=general.Countdown(timeout)
while True:
if int(self.query("SD?")):
return
if ctd.passed():
raise NewportError("timeout while waiting for network scan")
[docs] @muxaddr
def get_addr(self, addr=None):
"""Get RS-485 address of the given device (host if `addr` is ``None``)"""
return int(self.query("SA?",addr=addr))
[docs] @muxaddr
def set_addr(self, new_addr, addr=None):
"""Set RS-485 address of the given device (host if `addr` is ``None``)"""
self.query("SA{}".format(new_addr),addr=addr)
return self.get_addr(addr=addr)
_p_ipmode=interface.EnumParameterClass("ipmode",{"static":0,"dhcp":1})
[docs] @muxaddr
def get_ethernet_parameters(self, addr=None):
"""
Get Ethernet connection parameters.
Return tuple ``(hostname, ipaddr, ipmode, gateway, netmask)``.
"""
ipmode="dhcp" if int(self.query("IPMODE?",addr=addr)) else "static"
params=[self.query(q,addr=addr) for q in ["HOSTNAME?","IPADDR?","GATEWAY?","NETMASK?"]]
return tuple(params[:1]+[ipmode]+params[1:])
[docs] @muxaddr
@interface.use_parameters
def setup_ethernet(self, hostname=None, ipmode=None, ipaddr=None, gateway=None, netmask=None, addr=None):
"""
Setup Ethernet connection parameters.
Any ``None`` value remains unchanged.
Note that these settings only take effect after saving parameters to the memory (:meth:`save_parameters`)
and restarting the device (:meth:`reset`). If the connection is made through Ethernet, then it will likely be invalidated,
in which case a new device object with the updated parameters should be created after reset.
"""
if hostname is not None:
self.query("HOSTNAME {}".format(hostname),addr=addr)
if ipmode is not None:
self.query("IPMODE{}".format(ipmode),addr=addr)
if ipaddr is not None:
self.query("IPADDR {}".format(ipaddr),addr=addr)
if gateway is not None:
self.query("GATEWAY {}".format(gateway),addr=addr)
if netmask is not None:
self.query("NETMASK {}".format(netmask),addr=addr)
return self.get_ethernet_parameters()
[docs] @muxaddr
def autodetect_motors(self, addr=None):
"""
Autodetect connected motors.
The command involves sending single-step commands to the motors, so it requires all axes to be stopped,
and it might slightly affect the current position.
After the detection the types can be stored in the memory via :meth:`save_parameters`.
"""
self.query("MC",addr=addr)
self.wait_move("all",addr=addr)
return self.get_motor_type()
_p_motor_type=interface.EnumParameterClass("motor_type",{"none":0,"unknown":1,"tiny":2,"standard":3})
[docs] @muxaddr
@stage.muxaxis
@interface.use_parameters(_returns="motor_type")
def get_motor_type(self, axis="all", addr=None):
"""Get type of the given axis motor"""
return int(self.query("QM?",axis=axis,addr=addr))
[docs] @muxaddr
@stage.muxaxis(mux_argnames="motor_type")
@interface.use_parameters
def set_motor_type(self, axis="all", motor_type="standard", addr=None):
"""Manually set type of the given axis motor"""
self.query("QM{}".format(motor_type),axis=axis,addr=addr)
return self.get_motor_type(axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
def move_to(self, axis, position, addr=None):
"""Move to a given position"""
self.query("PA{}".format(position),axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
def move_by(self, axis, steps=1, addr=None):
"""Move by a given number of steps"""
self.query("PR{}".format(steps),axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
def get_position(self, axis="all", addr=None):
"""Get the current axis position"""
return int(self.query("TP?",axis=axis,addr=addr))
[docs] @muxaddr
@stage.muxaxis
def set_position_reference(self, axis, position=0, addr=None):
"""Set the current axis position as a reference (the actual motor position stays the same)"""
self.query("DH{}".format(position),axis=axis,addr=addr)
return self.get_position(axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
@interface.use_parameters
def jog(self, axis, direction, addr=0):
"""
Jog a given axis in a given direction.
`direction` can be either ``"-"`` (negative) or ``"+"`` (positive).
The motion continues until it is explicitly stopped.
"""
self.query("MV{}".format("+" if direction else "-"),axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
def is_moving(self, axis="all", addr=None):
"""Check if the axis is moving"""
return not int(self.query("MD?",axis=axis,addr=addr))
[docs] @muxaddr
@stage.muxaxis
def wait_move(self, axis="all", addr=None):
"""Wait until axis motion is done"""
while self.is_moving(axis=axis,addr=addr):
time.sleep(0.01)
@stage.muxaxis
def _stop_axis(self, axis, addr=None):
self.query("ST",axis=axis,addr=addr)
[docs] @muxaddr
def stop(self, axis="all", immediate=False, addr=None):
"""
Stop motion of a given axis.
If ``immediate==True`` make an abrupt stop; otherwise, slow down gradually.
Note that immediate stop has to stop all axes simultaneously, so it only takes ``axis=="all"``.
"""
if immediate:
if axis!="all":
raise NewportError("immediate stop has to be called with axis=='all'")
self.query("AB",addr=addr)
else:
self._stop_axis(axis=axis,addr=addr)
self.wait_move(axis=axis,addr=addr)
[docs] @muxaddr
@stage.muxaxis
def get_velocity_parameters(self, axis="all", addr=None):
"""
Return velocity parameters ``(speed, accel)`` for the given axis and controller.
``speed`` and ``accel`` denote, correspondingly, maximal (i.e., steady regime) moving speed and acceleration in steps/s and steps/s^2.
"""
return int(self.query("VA?",axis=axis,addr=addr)),int(self.query("AC?",axis=axis,addr=addr))
[docs] @muxaddr
@stage.muxaxis(mux_argnames=["speed","accel"])
def setup_velocity(self, axis="all", speed=None, accel=None, addr=None):
"""
Setup velocity parameters ``(speed, accel)`` for the given axis and controller.
``speed`` and ``accel`` denote, correspondingly, maximal (i.e., steady regime) moving speed and acceleration in steps/s and steps/s^2.
``None`` values are left unchanged.
"""
if speed is not None:
self.query("VA{}".format(speed),axis=axis,addr=addr)
if accel is not None:
self.query("AC{}".format(accel),axis=axis,addr=addr)
return self.get_velocity_parameters(axis=axis,addr=addr)
def _setup_all_velocity(self, params):
if not isinstance(params,dict):
params={None:params}
for addr,par in params.items():
for axis,vp in zip(self._axes,par):
self.setup_velocity(axis,*vp,addr=addr)