Source code for pylablib.devices.Standa.base

from ...core.utils import py3, crc, general, funcargparse
from ...core.devio import comm_backend, interface
from ..interface import stage

import struct
import collections
import time



[docs] class StandaError(comm_backend.DeviceError): """Generic Standa device StandaError"""
[docs] class StandaBackendError(StandaError,comm_backend.DeviceBackendError): """Generic Standa backend communication error"""
TEngineType=collections.namedtuple("TEngineType",["engine","driver"]) TStepperMotorCalibration=collections.namedtuple("TStepperMotorCalibration",["steps_per_rev","usteps_per_step"]) TFullState=collections.namedtuple("TFullState",["smov","scmd","spwr","senc","swnd","position","encoder","speed","ivpwr","ivusb","temp","flags","gpio"]) TMoveParams=collections.namedtuple("TMoveParams",["speed","accel","decel","antiplay"]) TPowerParams=collections.namedtuple("TPowerParams",["hold_current","reduct_enabled","reduct_delay","off_enabled","off_delay","ramp_enabled","ramp_time"])
[docs] class Standa8SMC(comm_backend.ICommBackendWrapper,stage.IStage): """ Generic Standa 8SMC4/8SMC5 controller device. Args: conn: serial connection parameters (usually port, a tuple containing port and baudrate, or a tuple with full specification such as ``("COM1", 115200, 8, 'N', 2)``) """ Error=StandaError def __init__(self, conn): instr=comm_backend.new_backend(conn,"serial",term_read="",term_write="",defaults={"serial":("COM1",115200,8,'N',2)},timeout=1.,reraise_error=StandaBackendError) super().__init__(instr) self._add_info_variable("engine_type",self.get_engine_type) self._add_info_variable("stepper_motor_calibration",self.get_stepper_motor_calibration) self._add_status_variable("status",self.get_status) self._add_status_variable("position",self.get_position) self._add_status_variable("encoder",self.get_encoder) self._add_settings_variable("move_parameters",self.get_move_parameters,self.setup_move) with self._close_on_error(): self._engine_type=self.get_engine_type() self._stepper_motor_calibration=self.get_stepper_motor_calibration() def _crc(self, msg): return crc.crc(msg,0x8005,refin=True,refout=True,init=0xFFFF) def _build_msg(self, cmd, data=b""): if len(cmd)!=4: raise ValueError("command is supposed to be 4 bytes long; got {}".format(len(cmd))) if data: return py3.as_builtin_bytes(cmd)+py3.as_builtin_bytes(data)+struct.pack("<H",self._crc(data)) return py3.as_builtin_bytes(cmd) _error_cmd={"errc":"unrecognized command","errd":"invalid CRC","errv":"value out of range"} def _parse_msg(self, msg, ecmd=None): if len(msg)<7 and len(msg)!=4: self._resync() raise StandaError("incoming message should be 4 bytes long or at least 7 bytes long; got {}".format(len(msg))) cmd=py3.as_str(msg[:4]) if cmd in self._error_cmd: self._resync() raise StandaError("command {}returned an error '{}' ({})".format("" if ecmd is None else ecmd+" ",cmd,self._error_cmd[cmd])) if ecmd is not None and cmd!=ecmd: self._resync() raise StandaError("expected reply with command {}; got {}".format(ecmd,cmd)) if len(msg)>4: data=msg[4:-2] mcrc,=struct.unpack("<H",msg[-2:]) ecrc=self._crc(data) if ecrc!=mcrc: self._resync() raise StandaError("invalid CRC: expected 0x{:04X}, got 0x{:04X}".format(ecrc,mcrc)) return data _full_reply_delay=10E-3
[docs] def query(self, cmd, data=b"", retlen=None): msg=self._build_msg(cmd,data) self.instr.write(msg) while True: b=self.instr.read(1) if b!=b"\x00": if retlen is None: time.sleep(self._full_reply_delay) reply=b+self.instr.read() else: reply=b+self.instr.read(retlen+5 if retlen else 3) break return self._parse_msg(reply,cmd)
_comms={"gent":(None, (8,"BB")), "gpos":(None, (20,"<ihq")), "spos":((20,"<ihqB"), None), "move":((12,"<ih"), None), "movr":((12,"<ih"), None), "gets":(None,(48,"<BBBBBihqihhhhhhIIB")), "gmov":(None,(24,"<IBHHIB")), "smov":((24,"<IBHHIB"),None), "gpwr":(None,(14,"<BHHHB")), "spwr":((14,"<BHHHB"),None), }
[docs] def pquery(self, cmd, *args): if cmd not in self._comms: raise ValueError("command {} is not among defined commands".format(cmd)) arg,rval=self._comms[cmd] if arg is not None: data=struct.pack(arg[1],*args) data+=b"\x00"*(arg[0]-len(data)) else: data=b"" reply=self.query(cmd,data,retlen=0 if rval is None else rval[0]) if rval is not None: return struct.unpack(rval[1],reply[:struct.calcsize(rval[1])])
def _resync(self): for _ in range(250): self.instr.write(b"\x00") time.sleep(1E-3) res=self.instr.read() if res and res==b"\x00"*len(res): return True return False _p_eng_type=interface.EnumParameterClass("eng_type",{"none":0,"dc":1,"dc2":2,"step":3,"test":4,"brushless":5}) _p_drv_type=interface.EnumParameterClass("drv_type",{"fet":1,"integr":2,"ext":3})
[docs] @interface.use_parameters(_returns=("eng_type","drv_type")) def get_engine_type(self): """Get engine and driver type""" return TEngineType(*self.pquery("gent"))
[docs] def get_stepper_motor_calibration(self): """ Get stepper motor calibration parameters. Return tuple ``(steps_per_rev, usteps_per_step)``. """ fusps,spr=struct.unpack("<BH",self.query("geng")[13:16]) usps=(1<<(fusps-1)) if fusps>0 else 1 return TStepperMotorCalibration(spr,usps)
def _pup2p(self, stp, ustp): usps=self._stepper_motor_calibration.usteps_per_step if self._engine_type.engine=="step": return stp*usps+ustp return stp def _p2pup(self, stp): usps=self._stepper_motor_calibration.usteps_per_step if self._engine_type.engine=="step": s=1 if stp>=0 else -1 return s*int(abs(stp)//usps),s*int(abs(stp)%usps) return stp _p_state_cmd=interface.EnumParameterClass("state_cmd",{"unknown":0,"move":1,"movr":2,"left":3,"right":4,"stop":5,"home":6,"loft":7,"sstp":8}) _p_state_pwr=interface.EnumParameterClass("state_pwr",{"unknown":0,"off":1,"norm":3,"reduct":4,"max":5}) _p_state_enc=interface.EnumParameterClass("state_enc",{"absent":0,"unknown":1,"malfunc":2,"referse":3,"ok":4}) _p_state_wnd=interface.EnumParameterClass("state_wnd",{"absent":0,"unknown":1,"malfunc":2,"ok":3}) _move_flags={"moving":0x01,"target_speed":0x02,"antiplay":0x04} _cmd_flags={"error":0x40,"running":0x80}
[docs] def get_status(self): """ Get device status. Return tuple ``(smov, scmd, spwr, senc, swnd, position, encoder, speed, ivpwr, ivusb, temp, flags, gpio)`` with the moving state (whether motor is moving, reached speed, etc.), command state (last issued command and its status), power state, encoder state, winding state (currently not used), step position, encoder position, current speed, current and voltage of the power supply, current and voltage of the USB source, temperature (in C), and additional state and GPIO flags. """ stat=self.pquery("gets") smov=tuple(f for f,m in self._move_flags.items() if stat[0]&m) if stat[1]&self._cmd_flags["running"]: cmdstate="running" elif stat[1]&self._cmd_flags["error"]: cmdstate="error" else: cmdstate="success" scmd=(self._parameters["state_cmd"].i(stat[1]&0x3F),cmdstate) spwr=self._parameters["state_pwr"].i(stat[2]) senc=self._parameters["state_enc"].i(stat[3]) swnd=(self._parameters["state_wnd"].i(stat[4]&0x0F),self._parameters["state_wnd"].i(stat[4]>>4)) position=self._pup2p(*stat[5:7]) encoder=stat[7] speed=self._pup2p(*stat[8:10]) ivpwr=stat[10]/1E3,stat[11]/100 ivusb=stat[12]/1E3,stat[13]/100 temp=stat[14]/10 flags=stat[15] gpio=stat[16] return TFullState(smov,scmd,spwr,senc,swnd,position,encoder,speed,ivpwr,ivusb,temp,flags,gpio)
[docs] def is_moving(self): """Check if the motor is moving""" stat=self.pquery("gets") return bool(stat[0]&0x01) or bool(stat[1]&0x80)
[docs] def wait_move(self, timeout=None): """Wait until motion is done""" ctd=general.Countdown(timeout) while self.is_moving(): if ctd.passed(): raise StandaError("timeout waiting for move") time.sleep(10E-3)
[docs] def get_position(self): """Return step position (in steps for a DC motor, in microsteps for a stepper motor)""" return self._pup2p(*self.pquery("gpos")[:2])
[docs] def get_encoder(self): """Return encoder position""" return self.pquery("gpos")[2]
[docs] def set_position_reference(self, position=0): """ Set position reference (in steps for a DC motor, in microsteps for a stepper motor). Actual motor position stays the same. """ stp,ustp=self._p2pup(position) self.pquery("spos",stp,ustp,0,0x02) return self.get_position()
[docs] def set_encoder_reference(self, position=0): """ Set encoder reference. Actual motor position stays the same. """ self.pquery("spos",0,0,position,0x01) return self.get_encoder()
[docs] def move_to(self, position): """Move to the given position (in steps for a DC motor, in microsteps for a stepper motor)""" self.pquery("move",*self._p2pup(position))
[docs] def move_by(self, distance): """Move by the given distance (in steps for a DC motor, in microsteps for a stepper motor)""" self.pquery("movr",*self._p2pup(distance))
[docs] def stop(self, immediate=False): self.query("stop" if immediate else "sstp")
[docs] def power_off(self, stop="soft"): funcargparse.check_parameter_range(stop,"stop",["soft","immediate","none"]) if stop!="none" and self.is_moving(): self.stop(immediate=(stop=="immediate")) self.query("pwof")
[docs] @interface.use_parameters def jog(self, direction): """Start moving in a given direction (``"+"`` or ``"-"``)""" self.query("rigt" if direction else "left")
[docs] def home(self, sync=True, timeout=30.): """ Home the motor. If ``sync==True``, wait until the homing is complete, or until `timeout` expires. """ self.query("home") time.sleep(10E-3) if sync: self.wait_move(timeout)
[docs] def get_move_parameters(self): """ Get moving parameters. Return tuple ``(speed, accel, decel, antiplay)``. """ par=self.pquery("gmov") return TMoveParams(self._pup2p(*par[:2]),self._pup2p(par[2],0),self._pup2p(par[3],0),self._pup2p(*par[4:6]))
[docs] def setup_move(self, speed=None, accel=None, decel=None, antiplay=None): """ Setup moving parameters. If any parameter is ``None``, use the current value. """ par=self.get_move_parameters() speed=par.speed if speed is None else int(speed) accel=par.accel if accel is None else int(accel) decel=par.decel if decel is None else int(decel) antiplay=par.antiplay if antiplay is None else int(antiplay) par=self._p2pup(speed)+(self._p2pup(accel)[0],self._p2pup(decel)[0])+self._p2pup(antiplay) self.pquery("smov",*par) return self.get_move_parameters()
[docs] def get_power_parameters(self): """ Get power parameters. Return tuple ``(hold_current, reduct_enabled, reduct_delay, off_enabled, off_delay, ramp_enabled, ramp_time)``. """ par=self.pquery("gpwr") return TPowerParams(par[0],bool(par[4]&0x01),par[1]/1E3,bool(par[4]&0x02),par[2]/1E3,bool(par[4]&0x04),par[3]/1E3)
[docs] def setup_power(self, hold_current=None, reduct_enabled=None, reduct_delay=None, off_enabled=None, off_delay=None, ramp_enabled=None, ramp_time=None): """ Setup power parameters. If any parameter is ``None``, use the current value. """ par=self.get_power_parameters() hold_current=par.hold_current if hold_current is None else hold_current reduct_enabled=par.reduct_enabled if reduct_enabled is None else reduct_enabled reduct_delay=par.reduct_delay if reduct_delay is None else reduct_delay off_enabled=par.off_enabled if off_enabled is None else off_enabled off_delay=par.off_delay if off_delay is None else off_delay ramp_enabled=par.ramp_enabled if ramp_enabled is None else ramp_enabled ramp_time=par.ramp_time if ramp_time is None else ramp_time flags=(0x01 if reduct_enabled else 0x00)|(0x02 if off_enabled else 0x00)|(0x04 if ramp_enabled else 0x00) self.pquery("spwr",int(hold_current),int(reduct_delay*1E3),int(off_delay*1E3),int(ramp_time*1E3),flags) return self.get_power_parameters()