from ...core.devio import comm_backend
from ...core.utils import general, py3
from .base import TopticaError, TopticaBackendError
import re
import collections
import time
[docs]
def muxchan(*args, **kwargs):
"""Multiplex the function over its addr argument"""
if len(args)>0:
return muxchan(**kwargs)(args[0])
def chan_func(self, *_, **__):
return list(range(1,self._channels_number+1))
return general.muxcall("channel",special_args={"all":chan_func},mux_argnames=kwargs.get("mux_argnames",None),return_kind="list",allow_partial=True)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["serial","version"])
TWorkHours=collections.namedtuple("TWorkHours",["power_up","laser_on"])
TTemperatures=collections.namedtuple("TTemperatures",["diode","baseplate"])
[docs]
class TopticaIBeam(comm_backend.ICommBackendWrapper):
"""
Toptica iBeam smart laser controller.
Args:
conn: connection parameters - index of the Attocube ANC350 in the system (for a single controller leave 0)
timeout(float): default operation timeout
"""
Error=TopticaError
def __init__(self, conn="COM1"):
instr=comm_backend.new_backend(conn,"serial",term_read=["\r\n","CMD> "],term_write="\r\n",timeout=3.,defaults={"serial":("COM1",115200)},reraise_error=TopticaBackendError)
super().__init__(instr)
self._channels_number=1
self._add_info_variable("device_info",self.get_device_info)
self._add_status_variable("hours",self.get_work_hours)
self._add_status_variable("temperatures",self.get_temperatures)
self._add_status_variable("full_data",self.get_full_data,priority=-5)
self._add_info_variable("channels_number",self.get_channels_number)
self._add_settings_variable("enabled",self.is_enabled,self.enable)
self._add_settings_variable("channel_enabled",self.is_channel_enabled, lambda v: self.enable_channel("all",v))
self._add_settings_variable("channel_power",self.get_channel_power,lambda v: self.set_channel_power("all",v))
self._add_status_variable("output_power",self.get_output_power)
self._add_status_variable("drive_current",self.get_drive_current)
self._add_status_variable("current_limits",self.get_current_limits)
self.open()
[docs]
def open(self):
res=super().open()
try:
self._flush_read()
self.query("echo off",multiline=True,reply=False)
self._channels_number=self._detect_channels_number()
return res
except self.Error:
self.close()
raise TopticaError("error connecting to the Toptica laser controller")
_err_re=re.compile(r"%SYS-(\w)",flags=re.IGNORECASE)
def _flush_read(self):
self.instr.flush_read()
time.sleep(0.005)
def _do_query(self, comm, multiline=False, keep_whitespace=False, check_error="FEW", reply=True):
self._flush_read()
self.instr.write(comm)
lines=[]
while True:
ln=py3.as_str(self.instr.readline(skip_empty=False,remove_term=False))
if not keep_whitespace:
ln=ln.strip()
else:
ln=ln.rstrip()
if ln.strip() in ["[OK]","CMD>"]:
break
if ln or keep_whitespace:
lines.append(ln)
if lines and check_error:
merr=self._err_re.match(lines[0])
if merr and merr[1] in check_error:
raise TopticaError("command {} resulted in error: {}".format(comm,lines[0]))
if reply and not lines:
raise TopticaError("expected single line, but got no response")
if multiline:
return lines
elif len(lines)>1:
raise TopticaError("expected single line, but got response {} with {} lines".format("\n".join(lines),len(lines)))
return lines[-1] if lines else None
[docs]
def query(self, comm, multiline=False, keep_whitespace=False, check_error="FEW", reply=True):
ntries=5
error_delay=0.5
for i in range(ntries):
try:
return self._do_query(comm,multiline=multiline,keep_whitespace=keep_whitespace,check_error=check_error,reply=reply)
except TopticaError:
if i==ntries-1:
raise
time.sleep(error_delay)
[docs]
def reboot(self):
"""Reboot the laser system"""
self.query("reset system",reply=False)
[docs]
def get_device_info(self):
"""Get the device info of the laser system: ``(serial, version)``"""
serial=self.query("show serial",multiline=True)[-1]
if serial.startswith("SN: "):
serial=serial[4:]
version=self.query("version")
return TDeviceInfo(serial,version)
[docs]
def get_full_data(self, formatted=False):
"""Return the comprehensive device data"""
ram=self.query("show data",keep_whitespace=formatted,multiline=True)
return "\n".join(ram) if formatted else ram
[docs]
def get_work_hours(self):
"""Get the work hours (power on time and laser on time)"""
reply=self.query("status uptime",multiline=True)
hours={}
for ln in reply:
m=re.match(r"(.*):\s*(\d+)\s*h\s*\+\s*(\d+)\s*s",ln,flags=re.IGNORECASE)
if m:
hours[m[1].lower()]=float(m[2])+float(m[2])/3600
return TWorkHours(hours.get("powerup",None),hours.get("laseron",None))
def _detect_channels_number(self):
return len(self._get_all_channel_powers())
[docs]
def get_channels_number(self):
"""Get number of supported laser channels"""
return self._channels_number
[docs]
def is_enabled(self):
"""Check if the output is enabled"""
return self.query("status laser").upper()=="ON" # pylint: disable=no-member
[docs]
def enable(self, enabled=True):
"""Turn the output on or off"""
self.query("laser {}".format("on" if enabled else "off"),reply=False)
return self.is_enabled()
[docs]
@muxchan
def is_channel_enabled(self, channel="all"):
"""Check if the specific channel is enabled"""
return self.query("status channel {}".format(channel)).upper()=="ON" # pylint: disable=no-member
[docs]
@muxchan(mux_argnames="enabled")
def enable_channel(self, channel, enabled=True):
"""Turn the specific channel on or off"""
self.query("{} {}".format("enable" if enabled else "disable",channel),reply=False)
return self.is_channel_enabled(channel)
def _get_all_channel_powers(self):
reply=self.query("show level power",multiline=True)
powers={}
for ln in reply:
m=re.match(r"CH(\d+),\s*PWR:\s*([\d.]+)\s*(mW|uW)",ln,flags=re.IGNORECASE)
if m:
p=float(m[2])*(1E-3 if m[3].lower()=="mw" else 1E-6)
powers[int(m[1])]=p
return powers
[docs]
@muxchan
def get_channel_power(self, channel="all"):
"""Get specified channel power (in W)"""
return self._get_all_channel_powers()[channel]
[docs]
@muxchan(mux_argnames="power")
def set_channel_power(self, channel, power):
"""Set channel power (in W)"""
self.query("channel {} power {:.0f} micro".format(channel,power*1E6),reply=False)
return self.get_channel_power(channel)
[docs]
def get_output_power(self):
"""Get current output power (in W)"""
p=self.query("show power",multiline=True)[-1]
m=re.match(r"PIC\s*=\s*([\d.]+)\s*(mW|uW)",p,flags=re.IGNORECASE)
if m:
return float(m[1])*(1E-3 if m[2].lower()=="mw" else 1E-6)
raise TopticaError("can not parse reply '{}' to query '{}'".format(p,"show power"))
[docs]
def get_drive_current(self):
"""Get current diode drive current (in A)"""
p=self.query("show current",multiline=True)[-1]
m=re.match(r"LDC\s*=\s*([\d.]+)\s*mA",p,flags=re.IGNORECASE)
if m:
return float(m[1])*1E-3
raise TopticaError("can not parse reply '{}' to query '{}'".format(p,"show current"))
[docs]
def get_current_limits(self):
"""Get settings of all current limits (in A) as a dictionary"""
reply=self.query("show limit",multiline=True)
limits={}
for ln in reply:
m=re.match(r"(.*):\s*([\d.]+)\s*mA",ln,flags=re.IGNORECASE)
if m:
limits[m[1].lower()]=float(m[2])*1E-3
return limits
[docs]
def get_temperatures(self):
"""Get settings of all current limits (in A) as a dictionary"""
temperatures=[]
for q in ["show temperature","show temperature system"]:
reply=self.query(q,multiline=True)[-1]
m=re.match(r"TEMP\s*=\s*([\d.]+)\s*C",reply,flags=re.IGNORECASE)
if m:
temperatures.append(float(m[1]))
else:
raise TopticaError("can not parse reply '{}' to query '{}'".format(reply,q))
return TTemperatures(*temperatures)