Source code for pylablib.devices.M2.emm

from ...core.utils import general, funcargparse

from .base import M2Error, M2ParseError
from .base import ICEBlocDevice, c

import time

[docs] class EMM(ICEBlocDevice): """ M2 EMM Ice Bloc device. Args: addr(str): IP address of the Ice Bloc device. port(int): port of the Ice Bloc device. timeout(float): default timeout of synchronous operations. start_link(bool): if ``True``, initialize device link on creation. """ def __init__(self, addr, port, timeout=5., start_link=True): super().__init__(addr,port,timeout=timeout,start_link=start_link) self._add_status_variable("laser_status",self.get_laser_status) self._add_status_variable("terascan_status",lambda: self.get_terascan_status("all")) _terascan_update_op="automatic_output" _extra_update_ops=(_terascan_update_op,)
[docs] def get_laser_status(self): """Get the device system status""" _,reply=self.query("status",{}) for k in reply: if isinstance(reply[k],list): reply[k]=reply[k][0] return reply
[docs] def fine_tune_wavelength(self, wavelength, beam="visible", sync=True, timeout=None): """ Fine-tune the wavelength. If ``sync==True``, wait until the operation is complete (might take from several seconds up to several minutes). """ funcargparse.check_parameter_range(beam,"beam",["visible","infrared"],error_type=M2Error) _,reply=self.query("wavelength",{"target":[wavelength*1E9],"beam":beam},report=True) if reply["status"][0]==1: raise M2Error("could not fine-tune wavelength: {:.3f}nm is out of range".format(wavelength*1E9)) if sync: self.wait_for_fine_tuning(timeout=timeout)
[docs] def check_fine_tuning_report(self): """ Check wavelength fine-tuning report Return ``"success"`` or ``"fail"`` if the operation is complete, or ``None`` if it is still in progress. """ return self.check_report("wavelength")
[docs] def wait_for_fine_tuning(self, timeout=None): """Wait until wavelength fine-tuning is complete""" self.wait_for_report("wavelength",timeout=timeout)
[docs] def is_fine_tuning(self): """check if fine tuning is in progress""" return self.get_laser_status().get("tuning","idle")=="active"
_fine_tuning_status=["idle","nolink","tuning","locked"]
[docs] def get_fine_tuning_status(self): """ Get fine-tuning status. Return either ``"idle"`` (no tuning or locking) or ``"active"`` (tuning in progress). """ return self.get_laser_status().get("tuning","idle")
[docs] def get_fine_wavelength(self): """Get fine-tuned wavelength""" return self.get_laser_status()["wavelength"]*1E-9
[docs] def stop_fine_tuning(self): """Stop fine wavelength tuning""" self.query("wavelength_stop",{})
_terascan_kinds={"medium","fine","ir_medium","ir_fine"} def _check_terascan_type(self, scan_type): funcargparse.check_parameter_range(scan_type,"scan_type",self._terascan_kinds,error_type=M2Error) return scan_type _terascan_rates=[ 1E6,2E6,5E6,10E6,20E6,50E6,100E6,200E6,500E6, 1E9,2E9,5E9,10E9,20E9,50E9,100E9 ] def _trunc_terascan_rate(self, rate): for tr in self._terascan_rates[::-1]: if rate>=tr*(1-1E-5): return tr return self._terascan_rates[0]
[docs] def setup_terascan(self, scan_type, scan_range, rate, trunc_rate=True): """ Setup terascan. Args: scan_type(str): scan type. Can be ``"medium"`` (BRF+etalon, rate from 100 GHz/s to 1 GHz/s), ``"fine"`` (all elements, rate from 20 GHz/s to 1 MHz/s), ``"ir_medium"`` or ``"ir_fine"`` (same as ``"medium"`` or ``"fine"``, but defined for the IR laser) scan_range(tuple): tuple ``(start, stop)`` with the scan range (in Hz). rate(float): scan rate (in Hz/s). trunc_rate(bool): if ``True``, truncate the scan rate to the nearest available rate (otherwise, incorrect rate would raise an error). """ scan_type=self._check_terascan_type(scan_type) if trunc_rate: rate=self._trunc_terascan_rate(rate) if rate>=1E9: fact,units=1E9,"GHz/s" else: fact,units=1E6,"MHz/s" params={"scan":scan_type,"start":[c/scan_range[0]*1E9],"stop":[c/scan_range[1]*1E9],"rate":[rate/fact],"units":units} _,reply=self.query("scan_stitch_initialise",params) if reply["status"][0]==1: raise M2Error("could not setup TeraScan: start ({:.3f} THz) is out of range".format(scan_range[0]/1E12)) elif reply["status"][0]==2: raise M2Error("could not setup TeraScan: stop ({:.3f} THz) is out of range".format(scan_range[1]/1E12)) elif reply["status"][0]==3: raise M2Error("could not setup TeraScan: scan out of range") elif reply["status"][0]==4: raise M2Error("could not setup TeraScan: TeraScan not available")
_terascan_update_reps=(5,0.5)
[docs] def start_terascan(self, scan_type, sync=False, sync_done=False): """ Start terascan. Scan parameters are set up separately using :meth:`setup_terascan`. Scan type can be ``"medium"`` (BRF+etalon, rate from 100 GHz/s to 1 GHz/s), ``"fine"`` (all elements, rate from 20 GHz/s to 1 MHz/s), ``"ir_medium"`` or ``"ir_fine"`` (same as ``"medium"`` or ``"fine"``, but defined for the IR laser) If ``sync==True``, wait until the scan is set up (not until the whole scan is complete). If ``sync_done==True``, wait until the whole scan is complete (not recommended, as it can take hours). """ scan_type=self._check_terascan_type(scan_type) if sync: self.enable_terascan_updates() _,reply=self.query("scan_stitch_op",{"scan":scan_type,"operation":"start"},report=True) if sync: for _ in range(self._terascan_update_reps[0]): self.enable_terascan_updates() time.sleep(self._terascan_update_reps[1]) if reply["status"][0]==1: raise M2Error("could not start TeraScan: operation failed") elif reply["status"][0]==2: raise M2Error("could not start TeraScan: TeraScan not available") if sync: self.wait_for_terascan_update() if sync_done: self.wait_for_report("scan_stitch_op")
[docs] def enable_terascan_updates(self, enable=True, update_period=1E-2, update_delay=0): """ Enable sending periodic terascan updates. If enabled, laser will send updates in the beginning and in the end of every terascan segment. If ``update_period!=0``, it will also send updates every ``update_period`` percents of the segment (this option is not currently supported by M2 firmware). """ _,reply=self.query("terascan_output",{"operation":("start" if enable else "stop"),"update":[int(update_period*1E2)],"delay":[int(update_delay*1E2)],"pause":"off"}) if reply["status"][0]==1: raise M2Error("could not setup TeraScan updates: operation failed") if reply["status"][0]==2: raise M2Error("could not setup TeraScan updates: incorrect update rate") if reply["status"][0]==3: raise M2Error("could not setup TeraScan updates: TeraScan not available") self._last_status[self._terascan_update_op]=None
[docs] def check_terascan_update(self): """ Check the latest terascan update. Return ``None`` if none are available, or a dictionary ``{"wavelength":current_wavelength, "activity":op}``, where ``op`` is ``"scanning"`` (scanning in progress), ``"stitching"`` (stitching in progress), or ``"repeat"`` (segment is repeated). """ self.update_reports() rep=self._last_status.get(self._terascan_update_op,None) if rep is not None: rep["activity"]={"scan":"scanning","repeat":"repeat"}.get(rep["status"],"stitching") return rep
[docs] def wait_for_terascan_update(self): """Wait until a new terascan update is available""" self.wait_for_report(self._terascan_update_op) return self.check_terascan_update()
[docs] def check_terascan_start_report(self): """ Check report on terascan start. Return ``"success"`` or ``"fail"`` if the operation is complete, or ``None`` if it is still in progress. """ return self.check_report("scan_stitch_op")
[docs] def stop_terascan(self, scan_type, sync=False): """ Stop terascan of the given type. If ``sync==True``, wait until the operation is complete. """ scan_type=self._check_terascan_type(scan_type) _,reply=self.query("scan_stitch_op",{"scan":scan_type,"operation":"stop"},report=True) if reply["status"][0]==1: raise M2Error("could not stop TeraScan: operation failed") elif reply["status"][0]==2: raise M2Error("could not stop TeraScan: TeraScan not available") if sync: self.wait_for_report("scan_stitch_op")
[docs] def get_terascan_status(self, scan_type): """ Get status of a terascan of a given type (or all statuses if ``scan_type=="all"``). Return a dictionary with 3 items: ``"current"``: current laser frequency (or ``None`` if no scan is in progress) ``"range"``: tuple with the fill scan range (or ``None`` if no frequency is available) ``"status"``: can be ``"stopped"`` (scan is not in progress), ``"scanning"`` (scan is in progress), or ``"stitching"`` (scan is in progress, but currently stitching) """ if scan_type=="all": return {k:self.get_terascan_status(k) for k in self._terascan_kinds} scan_type=self._check_terascan_type(scan_type) try: _,reply=self.query("scan_stitch_status",{"scan":scan_type}) except M2ParseError as err: if err.code==9: return {"status":"stopped","range":None,"current":None} status={} if reply["status"][0]==0: status["status"]="stopped" status["range"]=None status["current"]=None elif reply["status"][0]==1: if reply["operation"][0]==0: status["status"]="stitching" elif reply["operation"][0]==1: status["status"]="scanning" status["range"]=c/(reply["start"][0]/1E9),c/(reply["stop"][0]/1E9) status["current"]=c/(reply["current"][0]/1E9) if reply["current"][0] else None elif reply["status"][0]==2: raise M2Error("could not get TeraScan status: TeraScan not available") return status
_default_terascan_rates={"fine":100E6,"medium":5E9,"ir_fine":100E6,"ir_medium":5E9}
[docs] def stop_all_operation(self, repeated=True, attempt=0): """ Stop all laser operations (tuning and scanning). If ``repeated==True``, repeat trying to stop the operations until succeeded (more reliable, but takes more time). If ``attempt>0``, it can supply the number of already tried attempts to stop (with ``repeated=False``); the more attempts failed, the more drastic measures will be taken to stop (e.g., initialize short terascan) Return ``True`` if the operation is success and ``False`` otherwise. """ ctd=general.Countdown(self.timeout or None) while True: operating=False for scan_type in self._terascan_kinds: stat=self.get_terascan_status(scan_type) if stat["status"]!="stopped": operating=True self.stop_terascan(scan_type) time.sleep(1.) if attempt>4 and attempt%2==1: rate=self._default_terascan_rates[scan_type] scan_center=(stat["current"] or 400E12)-(attempt-5)*100E9 self.setup_terascan(scan_type,(scan_center,scan_center+100E9),rate) self.start_terascan(scan_type) time.sleep(2.) self.stop_terascan(scan_type) time.sleep(2.) if self.is_fine_tuning(): operating=True self.stop_fine_tuning() if (not repeated) or (not operating): break time.sleep(0.1) attempt+=1 if (attempt>12 and ctd.passed()): raise M2Error("could not stop all operations: timed out") return not operating