from ...core.devio import comm_backend, interface
from ...core.utils import py3, units
import collections
[docs]
class OphirError(comm_backend.DeviceError):
"""Generic Ophir device error"""
[docs]
class OphirBackendError(OphirError,comm_backend.DeviceBackendError):
"""Generic Ophir backend communication error"""
[docs]
class OphirDevice(comm_backend.ICommBackendWrapper):
"""
Generic Ophir device.
Args:
conn: serial connection parameters (usually port or a tuple containing port and baudrate)
"""
Error=OphirError
def __init__(self, conn):
instr=comm_backend.new_backend(conn,("auto","serial"),term_read="\r\n",term_write="\r\n",defaults={"serial":("COM1",9600)},reraise_error=OphirBackendError)
comm_backend.ICommBackendWrapper.__init__(self,instr)
def _parse_response(self, comm, resp):
resp=resp.strip()
if resp.startswith(b"?"):
raise OphirError("Command {} returned error: {}".format(comm,resp[1:].strip()))
if resp.startswith(b"*"):
return py3.as_str(resp[1:].strip())
raise OphirError("Command {} returned unrecognized response: {}".format(comm,resp))
[docs]
def query(self, comm):
"""Send a query to the device and parse the reply"""
comm=comm.strip()
with self.instr.single_op():
self.instr.flush_read()
self.instr.write(comm)
resp=self.instr.readline()
return self._parse_response(comm,resp)
THeadInfo=collections.namedtuple("THeadInfo",["type","serial","name","capabilities"])
TDeviceInfo=collections.namedtuple("TDeviceInfo",["id","serial","name","rom_version"])
TWavelengthInfo=collections.namedtuple("TWavelengthInfo",["mode","rng","curr_idx","presets","curr_wavelength"])
TRangeInfo=collections.namedtuple("TRangeInfo",["curr_idx","ranges","curr_range"])
[docs]
class VegaPowerMeter(OphirDevice):
"""
Ophir Vega power meter.
Args:
conn: serial connection parameters (usually port or a tuple containing port and baudrate)
"""
def __init__(self, conn):
OphirDevice.__init__(self,conn)
self._add_info_variable("head_info",self.get_head_info)
self._add_info_variable("device_info",self.get_device_info)
self._add_status_variable("power",self.get_power,ignore_error=OphirError)
self._add_status_variable("energy",self.get_energy,ignore_error=OphirError)
self._add_status_variable("frequency",self.get_frequency,ignore_error=OphirError)
self._add_status_variable("units",self.get_units)
self._add_settings_variable("wavelength",self.get_wavelength,self.set_wavelength,ignore_error=OphirError)
self._add_status_variable("wavelength_info",self.get_wavelength_info,ignore_error=OphirError)
self._add_status_variable("range_info",self.get_range_info,ignore_error=OphirError)
self._add_status_variable("battery_condition",self.get_battery_condition,priority=-5)
self._add_status_variable("baudrate",self.get_baudrate,priority=-2)
self._add_status_variable("supported_baudrates",self.get_supported_baudrates,priority=-5)
self._add_settings_variable("range_idx",self.get_range_idx,self.set_range_idx)
self._add_settings_variable("filter_in",self.is_filter_in,self.set_filter,ignore_error=OphirError)
self._add_settings_variable("diffuser_in",self.is_diffuser_in,self.set_diffuser,ignore_error=OphirError)
_head_cap_bits={0:"power",1:"energy",18:"temperature",31:"frequency"}
_head_types={"TH":"thermopile","BC":"BC20","TP":"temperature_probe","SI":"photodiode","LX":"CIE","RP":"RP","PY":"pyroelectric","NJ":"nanojoule","XX":"no_head"}
[docs]
def get_head_info(self):
"""
Get head information.
Return tuple ``(type, serial, name, capabilities)``.
"""
htype,hserial,hname,hcaps=self.query("$HI").split()
htype=self._head_types.get(htype,htype)
hcaps=int(hcaps,base=16)
caps=tuple([v for b,v in self._head_cap_bits.items() if hcaps&(1<<b)])
return THeadInfo(htype,int(hserial),hname,caps)
[docs]
def get_device_info(self):
"""
Get device information.
Return tuple ``(id, serial, name, rom_version)``.
"""
did,dserial,dname=self.query("$II").split()
rver=self.query("$VE")
return TDeviceInfo(did,int(dserial),dname,rver)
[docs]
def reset(self):
"""Reset the device"""
self.query("$RE")
[docs]
def get_power(self):
"""
Get the current power readings.
Return either measured power, or ``"over"``, if the power is overrange.
"""
power=self.query("$SP")
if power.lower()=="over":
return "over"
return float(power)
[docs]
def get_energy(self):
"""
Get the current energy readings.
Return either measured energy, or ``"over"``, if the energy is overrange.
"""
energy=self.query("$SE")
if energy.lower()=="over":
return "over"
return float(energy)
[docs]
def get_frequency(self):
"""
Get the current frequency readings.
Return either measured frequency, or ``"over"``, if the power is overrange.
"""
freq=self.query("$SF")
if freq.lower()=="over":
return "over"
return float(freq)
[docs]
def get_units(self):
"""Get device reading units"""
return self.query("$SI")
[docs]
def get_wavelength_info(self):
"""
Get wavelength setting info.
Return tuple ``(mode, rng, curr_idx, presets, curr_wavelength)``, where
`mode` is the measurement mode (``"continuous"`` or ``"discrete"``),
`rng` is a 2-tuple with the full wavelength range (in m) for continuous mode or a set of all wavelengths for discrete mode,
`curr_idx` is the current wavelength preset index,
`presets` is the list of all preset wavelengths (in m) for continuous mode or a set of all wavelengths for discrete mode,
and `curr_wavelength` is the current measurement wavelength (in m) for continuous mode or the current wavelength name for discrete mode.
"""
info=[i.strip() for i in self.query("$AW").split() if i.strip()]
mode=info[0].lower()
if mode=="continuous":
rng=(float(info[1])*1E-9,float(info[2])*1E-9)
curr_idx=int(info[3])
presets=[float(w)*1E-9 for w in info[4:] if w.upper()!="NONE"]
return TWavelengthInfo(mode,rng,curr_idx-1,presets,presets[curr_idx-1])
else:
curr_idx=int(info[1])
rng=info[2:]
return TWavelengthInfo(mode,rng,curr_idx-1,rng,rng[curr_idx-1])
[docs]
def get_wavelength(self):
"""Get current wavelength (in nm)"""
return self.get_wavelength_info().curr_wavelength
[docs]
def set_wavelength(self, wavelength):
"""
Set current wavelength (in nm).
`wavelength` is either a wavelength (in m) for the continuous mode, or a wavelength preset (as a string) for a discrete mode.
"""
if isinstance(wavelength,py3.anystring):
self.query("$WW {}".format(wavelength.upper()))
else:
self.query("$WL{:d}".format(int(wavelength*1E9)))
return self.get_wavelength()
[docs]
def get_range_info(self):
"""
Get power range info.
Return tuple ``(curr_idx, ranges, curr_range)``, where `curr_idx` is the current power range index,
`ranges` is the list of ranges (in W) for all indices and `curr_range` is the current range (in W).
"""
info=[i.strip() for i in self.query("$AR").split() if i.strip()]
curr_idx=int(info[0])
ranges=[units.convert_power_units(*units.split_units(r),result_unit="W") for r in info[3:]]
if curr_idx<0:
curr_range=info[3+curr_idx]
else:
curr_range=ranges[curr_idx]
return TRangeInfo(curr_idx,ranges,curr_range)
[docs]
def get_range(self):
"""Get current power range (maximal power in W)"""
return self.get_range_info().curr_range
[docs]
def get_range_idx(self):
"""
Get current power range index
Index goes from 0 (highest) to maximal (lowest); auto-ranging is -1.
"""
return int(self.query("$RN"))
[docs]
def set_range_idx(self, rng_idx):
"""
Set current range index.
`rng_idx` is the range index from 0 (highest) to maximal (lowest); auto-ranging is -1.
The corresponding ranges are given by :meth:`get_range_info`.
"""
self.query("$WN{:d}".format(rng_idx))
return self.get_range_idx()
[docs]
def set_range(self, rng):
"""
Set current power range.
Select the smallest available range which is larger than `rng` (or maximal range, if the requested range is too large)
If `rng` is ``"auto"``, enable autorange; if `rng` is ``None``, set to the maximal range.
"""
if rng=="auto":
self.set_range_idx(-1)
else:
rng_info=self.get_range_info()
idx=0
for i,r in sorted(enumerate(rng_info),key=lambda v: v[1]):
if rng<=r:
idx=i
break
self.set_range_idx(idx)
return self.get_range()
[docs]
def get_battery_condition(self):
"""Check if the batter is OK"""
return bool(int(self.query("$BC")))
_p_baudrate=interface.EnumParameterClass("baudrate",{9600:1,19200:2,38400:3,300:4,1200:5,4800:6})
[docs]
@interface.use_parameters(_returns="baudrate")
def get_baudrate(self):
"""Get current baud rate"""
br=self.query("$BR0")
return int(br.split()[0])
[docs]
def get_supported_baudrates(self):
"""Get a list of all supported baud rates"""
br=self.query("$BR0")
return [int(v) for v in br.split()[1:]]
[docs]
@interface.use_parameters
def set_baudrate(self, baudrate):
"""
Set current baud rate.
If the baudrate is different from the current one, close the device connection.
The device object will need to be re-created with the newly specified baud rate.
"""
curr_baudrate=self._wap.get_baudrate()
self.query("$BR{}".format(baudrate))
if curr_baudrate!=baudrate:
self.close()
[docs]
def is_filter_in(self):
"""Check if the filter is set to be on at the power meter"""
return int(self.query("$FQ").split()[0])==2
[docs]
def set_filter(self, filter_in=True):
"""Change the filter setting at the power meter (on or off)"""
self.query("$FQ{:d}".format(2 if filter_in else 1))
return self.is_filter_in()
[docs]
def is_diffuser_in(self):
"""Check if the diffuser is set to be on at the power meter"""
return int(self.query("$DQ").split()[0])==2
[docs]
def set_diffuser(self, diffuser_in=True):
"""Change the diffuser setting at the power meter (on or off)"""
self.query("$DQ{:d}".format(2 if diffuser_in else 1))
return self.is_diffuser_in()