Source code for pylablib.devices.NKT.interbus

from ...core.utils import py3, crc, general, funcargparse
from ...core.devio import comm_backend, interface

import numpy as np

import struct
import collections
import contextlib
import threading



[docs] class InterbusError(comm_backend.DeviceError): """Generic Interbus device error"""
[docs] class InterbusBackendError(InterbusError,comm_backend.DeviceBackendError): """Generic Interbus backend communication error"""
TInterbusTelegram=collections.namedtuple("TInterbusTelegram",("dest","src","typ","payload"))
[docs] class GenericInterbusDevice(comm_backend.ICommBackendWrapper): """ Generic Interbus-connected device. Args: conn: serial connection parameters (usually port, a tuple containing port and baudrate, or a tuple with full specification such as ``("COM1", 9600, 8, 'N', 1)``) """ Error=InterbusError def __init__(self, conn): instr=comm_backend.new_backend(conn,"serial",term_read=b"\x0A",term_write="",defaults={"serial":("COM1",115200)},reraise_error=InterbusBackendError) self._ib_src=0x40 self._instr_lock=threading.Lock() super().__init__(instr) def _ib_get_dest(self, dest): if dest is None: return self.ib_dest return dest
[docs] def ib_get_default_address(self): """Get destination address used by default in Interbus methods""" return self.ib_dest
[docs] def ib_set_default_address(self, dest): """Set destination address used by default in Interbus methods""" self.ib_dest=dest
[docs] @contextlib.contextmanager def ib_using_address(self, dest): """Context manager for temporary using a different default destination address""" cdest=self.ib_dest self.ib_dest=dest try: yield finally: self.ib_dest=cdest
def _ib_crc(self, msg): return crc.crc(msg,0x1021) _ib_echar={0x0A,0x0D,0x5E} _ib_eechar={0x4A,0x4D,0x9E} def _ib_wrap_escape(self, msg): res=[0x0D] for c in msg: if c in self._ib_echar: res.append(0x5E) res.append(c+0x40) else: res.append(c) res.append(0x0A) return bytes(res) def _ib_unescape(self, msg): res=[] esc=False for c in msg: if esc: if c not in self._ib_eechar: ebytes=", ".join("0x{:02X}".format(v) for v in self._ib_eechar) raise InterbusError("faulty escape sequence: got byte 0x{:02X}, expected bytes {}".format(c,ebytes)) res.append(c-0x40) esc=False continue if c==0x5E: esc=True else: res.append(c) if esc: raise InterbusError("faulty escape sequence: escape character is not followed by any other character") return bytes(res) def _ib_build_telegram(self, dest, src, typ, payload): msg=struct.pack("BBB",dest,src,typ)+py3.as_bytes(payload) crcv=struct.pack(">H",self._ib_crc(msg)) return self._ib_wrap_escape(msg+crcv) def _ib_send_telegram(self, dest, src, typ, payload): msg=self._ib_build_telegram(dest,src,typ,payload) self.instr.write(msg) def _ib_recv_telegram(self): c=self.instr.read(1) if c!=b"\x0D": raise InterbusError("wrong telegram start: expected 0x{:02X}, got 0x{:02X}".format(0x0D,c[0])) msg=self.instr.readline() msg=self._ib_unescape(msg) if len(msg)<5: raise InterbusError("telegram should be at least 5 bytes long; got {}".format(len(msg))) ecrc=self._ib_crc(msg[:-2]) rcrc,=struct.unpack(">H",msg[-2:]) if ecrc!=rcrc: raise InterbusError("CRC error: expected 0x{:04X}, got 0x{:04X}".format(ecrc,rcrc)) return TInterbusTelegram(msg[0],msg[1],msg[2],msg[3:-2]) _ib_telegram_typs={0:"faulty_telegram",1:"crc_error",2:"busy",3:"ack",4:"read",5:"write",8:"reply"} def _ib_check_telegram(self, telegram, src=None, dest=None, typ=None): if src is not None and telegram.src!=src: raise InterbusError("unexpected telegram source: expected 0x{:02x}, got 0x{:02x}".format(src,telegram.src)) if dest is not None and telegram.dest!=dest: raise InterbusError("unexpected telegram destination: expected 0x{:02x}, got 0x{:02x}".format(dest,telegram.dest)) if telegram.typ==0: raise InterbusError("device has not understood telegram {}".format(repr(telegram.payload))) if telegram.typ==1: raise InterbusError("device reports CRC error for telegram {}".format(repr(telegram.payload))) if telegram.typ==2: raise InterbusError("device is busy and can not process telegram {}".format(repr(telegram.payload))) if typ is not None and telegram.typ!=typ: raise InterbusError("unexpected telegram destination: expected {}({}), got {}({})".format( typ,self._ib_telegram_typs.get(typ,"unknown"),telegram.typ,self._ib_telegram_typs.get(telegram.typ,"unknown"))) _p_ib_dtype=interface.EnumParameterClass("ib_dtype",{"raw":"raw","str":"str","u8":"B","u16":"<H","u32":"<I","i8":"b","i16":"<h","i32":"<i"})
[docs] @interface.use_parameters(dtype="ib_dtype") def ib_get_reg(self, dest, address, dtype="raw", array="auto"): """ Get register value at the given destination device and register address. `dtype` is the register type, which can be ``"raw"`` (raw bytes), ``"str"`` (string), ``"u8"``, ``"u16"``, ``"u32"``, ``"i8"``, ``"i16"``, ``"i32"`` (different integer values). """ with self._instr_lock: self._ib_send_telegram(dest,self._ib_src,4,struct.pack("B",address)) tg=self._ib_recv_telegram() self._ib_check_telegram(tg,src=dest,dest=self._ib_src,typ=8) if len(tg.payload)<1: raise InterbusError("request returned zero payload") if tg.payload[0]!=address: raise InterbusError("request returned unexpected address: expected 0x{:02x}, got 0x{:02x}".format(address,tg.payload[0])) val=tg.payload[1:] if dtype=="raw": return val if dtype=="str": return py3.as_str(val) l={"B":1,"b":1,"<H":2,"<h":2,"<I":4,"<i":4}[dtype] if array=="auto": array=len(val)>l if array and len(val)%l: raise InterbusError("register type {} expects multiple of {} bytes, got {}".format(self._p_ib_dtype.i(dtype),l,len(val))) if (not array) and len(val)!=l: raise InterbusError("register type {} expects {} bytes, got {}".format(self._p_ib_dtype.i(dtype),l,len(val))) if array: return [struct.unpack(dtype,val[i:i+l])[0] for i in range(0,len(val),l)] else: return struct.unpack(dtype,val)[0]
_p_ib_dtype=interface.EnumParameterClass("ib_dtype",{"raw":"raw","str":"str","u8":"B","u16":"<H","u32":"<I","i8":"b","i16":"<h","i32":"<i"})
[docs] @interface.use_parameters(dtype="ib_dtype") def ib_set_reg(self, dest, address, value, dtype="raw", array="auto", echo=True): """ Set register value at the given destination device and register address. `dtype` is the register type, which can be ``"raw"`` (raw bytes), ``"str"`` (string), ``"u8"``, ``"u16"``, ``"u32"``, ``"i8"``, ``"i16"``, ``"i32"`` (different integer values). If ``echo==True``, return the subsequent value of the register. """ if array=="auto": array=funcargparse.is_sequence(value) if dtype in {"raw","str"}: value=bytes(value) else: if array: value=b"".join([struct.pack(dtype,int(v)) for v in value]) else: value=struct.pack(dtype,int(value)) with self._instr_lock: self._ib_send_telegram(dest,self._ib_src,5,struct.pack("B",address)+value) tg=self._ib_recv_telegram() self._ib_check_telegram(tg,src=dest,dest=self._ib_src,typ=3) if len(tg.payload)<1: raise InterbusError("request returned zero payload") if tg.payload[0]!=address: raise InterbusError("request returned unexpected address: expected 0x{:02x}, got 0x{:02x}".format(address,tg.payload[0])) if echo: return self.ib_get_reg(dest,address,dtype=dtype,array=array)
[docs] def ib_scan_devices(self, dests="all", timeout=0.05): """ Scan for devices on the bus and return their addresses and types. `dests` is a list of addresses to check (``"all"`` means all addresses from 1 to 48 inclusive) `timeout` is the timeout to wait for each device reply. `func` and `payload` specify the message to send (by default, 'read coil' command with no arguments, which should always return and error) Since the addresses are polled consecutively, this function can take a long time (~25s for the default settings). """ if dests=="all": dests=range(1,49) detected={} with self._instr_lock: with self.instr.using_timeout(timeout): for d in dests: self._ib_send_telegram(d,self._ib_src,4,b"\x61") try: tg=self._ib_recv_telegram() self._ib_check_telegram(tg,src=d,dest=self._ib_src,typ=8) if len(tg.payload)>=2 and tg.payload[0]==0x61: detected[tg.src]=tg.payload[1] except InterbusError: pass return detected
[docs] class IInterbusModule(interface.IDevice): """ Specific Interbus module. Deals with specific registers available for this module. Args: ib_device: instance of the generic Interbus controller used to access the module. dest: module address on the bus. """ def __init__(self, ib_device, dest): super().__init__() self.ib_device=ib_device self.dest=dest with self._close_on_error(): self.dtype=self.get_register("type") self.serial=self.get_register("serial") if self._ib_type is not None and self.dtype!=self._ib_type: raise InterbusError("device at destination 0x{:02X} has unexpected type: expected 0x{:02X}, got 0x{:02X}".format(dest,self._ib_type,self.dtype)) def make_getter(r): return lambda: self.get_register(r) for r in self._ib_registers: self._add_info_variable(r,make_getter(r)) if "status_bits" in self._ib_registers: self._add_info_variable("status",self.get_status) def _get_connection_parameters(self): return self.ib_device._get_connection_parameters(),self.dest _ib_type=None _ib_registers={ # name: (addr, dtype, fmt=None, array=False) "type":(0x61,"u8"), "serial":(0x65,"str"), "status_bits":(0x66,"u16"), } _ib_registers_defaults={} _ib_status_bits={0x0001:"emission", 0x0002:"interlock_off", 0x0010:"disabled", 0x0020:"supply_low", 0x0040:"module_temperature_out_of_range", 0x8000:"error_present"}
[docs] def get_register(self, name): """Get value of the given register based on its name""" if name not in self._ib_registers: raise ValueError("unrecognized register: {}".format(name)) rdesc=self._ib_registers[name] addr,dtype,fmt,arr=rdesc+(None,False)[len(rdesc)-2:] try: value=self.ib_device.ib_get_reg(self.dest,addr,dtype,array=bool(arr)) except InterbusError: if name in self._ib_registers_defaults: return self._ib_registers_defaults[name] raise if arr: value=np.array(value) if fmt is not None: if fmt[0]=="fact": value=value*fmt[1] if fmt[0]=="map": value=fmt[1](value) return value
[docs] def get_all_registers(self): """Get values of all defined registers""" return {n:self.get_register(n) for n in self._ib_registers}
[docs] def set_register(self, name, value): """Set value of the given register based on its name""" if name not in self._ib_registers: raise ValueError("unrecognized register: {}".format(name)) rdesc=self._ib_registers[name] addr,dtype,fmt,arr=rdesc+(None,False)[len(rdesc)-2:] if arr: if arr=="auto" and not funcargparse.is_sequence(value): value=[value] value=np.asarray(value) if fmt is not None: if fmt[0]=="fact": value=value/fmt[1] self.ib_device.ib_set_reg(self.dest,addr,value,dtype,array=bool(arr),echo=False) return self.get_register(name)
[docs] def get_status(self): """Get device status as a set of set bits""" status=self.get_register("status_bits") return [n for m,n in self._ib_status_bits.items() if status&m]
def __repr__(self): return "{}(dest = {}, type = 0x{:02X}, serial = '{}')".format(type(self).__name__,self.dest,self.dtype,self.serial)
[docs] class GenericInterbusModule(IInterbusModule): _ib_name="generic" _ib_status_bits={(1<<i):"bit{:02d}".format(i) for i in range(16)}
[docs] class SuperKExtremeInterbusModule(IInterbusModule): _ib_type=0x60 _ib_name="superK_extreme" _ib_registers=general.merge_dicts(IInterbusModule._ib_registers, { "system_type":(0x6B,"u8"), "emission":(0x30,"u8"), "setup":(0x31,"u16"), "interlock":(0x32,"u16"), "pulse_picker_ratio":(0x34,"u16"), "watchdog_interval":(0x36,"u8"), "temperature_inlet":(0x11,"i16",("fact",0.1)), "power":(0x37,"u16",("fact",0.1)), "current":(0x38,"u16",("fact",0.1)), "nim_delay":(0x39,"u16",("fact",9E-12)), }) _ib_registers_defaults={"system_type":0} _ib_status_bits=general.merge_dicts(IInterbusModule._ib_status_bits, { 0x0004: "interlock_power_fail", 0x0008:"interlock_loop_off", 0x0080: "clock_battery_low_voltage", 0x2000: "crc_startup_error", 0x4000: "log_error" })
[docs] class SuperKFrontPanelInterbusModule(IInterbusModule): _ib_name="superK_front_panel" _ib_type=0x61 _ib_registers=general.merge_dicts(IInterbusModule._ib_registers, { "panel_lock":(0x3D,"u8"), "display_text":(0x72,"str",("map",lambda v: "".join((c if ord(c)<128 else "#") for c in v))) }) del _ib_registers["status_bits"]
[docs] class SuperKSelectDriverInterbusModule(IInterbusModule): _ib_name="superK_select_driver" _ib_type=0x66 _ib_registers=general.merge_dicts(IInterbusModule._ib_registers, { "power_on":(0x30,"u8"), "setup":(0x31,"u16",None), "min_wavelength":(0x34,"u32",("fact",1E-12)), "max_wavelength":(0x35,"u32",("fact",1E-12)), "temperature_crystal":(0x38,"i16",("fact",0.1)), "fsk_mode":(0x3B,"u8"), "internal_ctl":(0x3C,"u8"), "crystal":(0x75,"u8"), }) for i in range(8): _ib_registers["wavelength{}".format(i)]=(0x90+i,"u32",("fact",1E-12),"auto") for i in range(8): _ib_registers["amplitude{}".format(i)]=(0xB0+i,"u16",("fact",0.1),"auto") for i in range(8): _ib_registers["modgain{}".format(i)]=(0xC0+i,"u16",("fact",0.1)) _ib_status_bits=general.merge_dicts(IInterbusModule._ib_status_bits, { 0x2000: "aod_comm_timeout", 0x4000: "need_crystal_info" })
[docs] class SuperKSelectInterbusModule(IInterbusModule): _ib_name="superK_select" _ib_type=0x67 _ib_registers=general.merge_dicts(IInterbusModule._ib_registers, { "mon_input1":(0x10,"u16",("fact",0.1)), "mon_input2":(0x11,"u16",("fact",0.1)), "mon_input1_gain":(0x32,"u8"), "mon_input2_gain":(0x33,"u8"), "rf_switch":(0x34,"u8"), "mon_switch":(0x35,"u8"), "min_wavelength1":(0x90,"u32",("fact",1E-12)), "max_wavelength1":(0x91,"u32",("fact",1E-12)), "min_wavelength2":(0xA0,"u32",("fact",1E-12)), "max_wavelength2":(0xA1,"u32",("fact",1E-12)), }) _ib_registers_defaults={"mon_input1_gain":0,"mon_input2_gain":0} _ib_status_bits=general.merge_dicts(IInterbusModule._ib_status_bits, { 0x0004: "interlock_loop_in", 0x0008:"interlock_loop_out", 0x0080: "clock_battery_low_voltage", 0x0100: "shutter_sensor1", 0x0200: "shutter_sensor2", 0x0400: "new_temperature1", 0x0800: "new_temperature2", })
_ib_modules={m._ib_type:m for m in [SuperKExtremeInterbusModule,SuperKFrontPanelInterbusModule,SuperKSelectDriverInterbusModule,SuperKSelectInterbusModule]}
[docs] class InterbusSystem(GenericInterbusDevice): """ A collection of NKT modules connected to the same Interbus. Args: conn: serial connection parameters (usually port, a tuple containing port and baudrate, or a tuple with full specification such as ``("COM1", 9600, 8, 'N', 1)``) modules: Interbus modules identifiers; can be ``"auto"`` (detect all connected modules), a list of module addresses, or a dictionary ``{addr: name}`` of the aliases for the modules (e.g., ``{'laser':15, 'varia':18}``) Attributes: m: dictionary of modules, defined either by their address or by their name (if provided upon creation) """ def __init__(self, conn, modules="auto"): super().__init__(conn) self.m=self._find_modules(modules) self._add_info_variable("module_dests",lambda: list(self.m)) self._add_info_variable("all_registers",self.get_all_module_registers) def __getitem__(self, key): return self.m[key] def __contains__(self, key): return key in self.m def _find_modules(self, modules): names=modules if isinstance(modules,dict) else {} modules=self.ib_scan_devices(dests="all" if modules=="auto" else modules) return {names.get(m,m):_ib_modules.get(t,GenericInterbusModule)(self,m) for m,t in modules.items()}
[docs] def get_all_module_registers(self): """Get all registers""" return {m:d.get_all_registers() for m,d in self.m.items()}