Source code for pylablib.devices.Mightex.MightexSSeries

from .NewClassic_USBCamera_SDK_lib import wlib as lib
from .base import MightexError, MightexTimeoutError

from ...core.utils import py3
from ...core.devio import interface
from ..interface import camera
from ..utils import load_lib

import numpy as np
import numba as nb
import collections
import ctypes
import time


TCameraInfo=collections.namedtuple("TCameraInfo",["idx","model","serial"])
[docs] class LibraryController(load_lib.LibraryController): def __init__(self, lib): # pylint: disable=redefined-outer-name super().__init__(lib) self.ncams=None self.cams=None def _do_init(self): self.ncams=self.lib.NewClassicUSB_InitDevice() self.cams=[] for i in range(self.ncams): model,serial=self.lib.NewClassicUSB_GetModuleNoSerialNo(i+1) self.cams.append(TCameraInfo(i+1,py3.as_str(model).strip(),py3.as_str(serial).strip())) def _do_uninit(self): self.lib.NewClassicUSB_UnInitDevice() self.ncams=None self.cams=None
libctl=LibraryController(lib)
[docs] def restart_lib(): libctl.shutdown()
[docs] def list_cameras(): """List all cameras available through Mightex S-series interface""" with libctl.temp_open(): return list(libctl.cams)
[docs] def get_cameras_number(): """Get number of connected Mightex S-series cameras""" return len(list_cameras())
TDeviceInfo=collections.namedtuple("TDeviceInfo",["model","serial"])
[docs] class MightexSSeriesCamera(camera.IBinROICamera, camera.IExposureCamera): """ Generic Mightex S Series camera interface. Args: idx: camera index among the cameras listed using :func:`list_cameras`, starting with 1 """ Error=MightexError TimeoutError=MightexTimeoutError def __init__(self, idx=1): super().__init__() lib.initlib() self.idx=idx self._opid=None self._sensor_size=None self._roi=None self._buffer_mgr=None self._looper=self.ReceiveLooper() self.open() self._raw_readout_format=False self._add_info_variable("device_info",self.get_device_info) self._add_settings_variable("pixel_clock",self.get_pixel_clock,self.set_pixel_clock,ignore_error=(MightexError,)) self._add_settings_variable("hblanking",self.get_hblanking,self.set_hblanking,ignore_error=(MightexError,)) def _get_connection_parameters(self): return (self.idx,)
[docs] def open(self): """Open connection to the camera""" if self.is_opened(): return with libctl.temp_open(): cams=list_cameras() if self.idx>len(cams): raise MightexError("camera index {} is not available ({} cameras exist)".format(self.idx,len(cams))) with self._close_on_error(): lib.NewClassicUSB_AddDeviceToWorkingSet(self.idx) lib.NewClassicUSB_StartCameraEngine(0,8) self._opid=libctl.open().opid self._initialize_default_parameters()
[docs] def close(self): """Close connection to the camera""" if self.is_opened(): try: self.clear_acquisition() finally: try: lib.NewClassicUSB_StopCameraEngine() lib.NewClassicUSB_RemoveDeviceFromWorkingSet(self.idx) finally: libctl.close(self._opid) self._opid=None
[docs] def is_opened(self): return self._opid is not None
def _initialize_default_parameters(self): self._sensor_size=(w,h)=self._autodetect_dimensions() self._min_size=self._autodetect_dimensions(top=False) self._roi=(0,w,0,h,1,1) self.set_roi() self.set_pixel_clock("fast") self.set_hblanking("normal") self.set_exposure(1E-1)
[docs] def get_device_info(self): """ Get camera information. Return tuple ``(model, serial)``. """ return TDeviceInfo(*libctl.cams[self.idx-1][1:])
_dim_min=(4,4) _dim_max=(2**16,2**16) _dim_step=(4,4) _autodetect_dim_default=(64,64) def _autodetect_single_dim(self, dim, rmin, rmax, rstep, default, top=True): a,b=(default,rmax) if top else (rmin,default) while a+rstep<b: m=(a+b)//2 m-=m%rstep try: args=[self.idx,default,default,0] args[dim+1]=m lib.NewClassicUSB_SetCustomizedResolution(*args) a,b=(m,b) if top else (a,m) except MightexError: a,b=(a,m) if top else (m,b) return a if top else b def _autodetect_dimensions(self, top=True): w=self._autodetect_single_dim(0,self._dim_min[0],self._dim_max[0],self._dim_step[0],self._autodetect_dim_default[1],top=top) h=self._autodetect_single_dim(1,self._dim_min[1],self._dim_max[1],self._dim_step[1],self._autodetect_dim_default[0],top=top) return w,h def _get_data_dimensions_rc(self): hstart,hend,vstart,vend,hbin=self._roi[:5] return (vend-vstart)//hbin,(hend-hstart)//hbin
[docs] def get_detector_size(self): return self._sensor_size
[docs] def get_roi(self): return tuple(self._roi)
def _apply_roi(self): hstart,hend,vstart,vend,hbin=self._roi[:5] lib.NewClassicUSB_SetCustomizedResolution(self.idx,hend-hstart,vend-vstart,0 if hbin==1 else 1) lib.NewClassicUSB_SetXYStart(self.idx,hstart,vstart)
[docs] @camera.acqcleared def set_roi(self, hstart=0, hend=None, vstart=0, vend=None, hbin=1, vbin=1): _old_roi=self._roi hlim,vlim=self.get_roi_limits(hbin) hstart,hend,hbin=self._truncate_roi_axis((hstart,hend,hbin),hlim) vstart,vend,_=self._truncate_roi_axis((vstart,vend,hbin),vlim) self._roi=(hstart,hend,vstart,vend,hbin,hbin) try: self._apply_roi() except MightexError: self._roi=_old_roi self._apply_roi() return self.get_roi()
[docs] def get_roi_limits(self, hbin=1, vbin=1): hbin=2 if hbin>1 else 1 w,h=self.get_detector_size() hlim=camera.TAxisROILimit(self._min_size[0]*hbin,w,self._dim_step[0]*hbin,self._dim_step[0]*hbin,2) vlim=camera.TAxisROILimit(self._min_size[1]*hbin,h,self._dim_step[1]*hbin,self._dim_step[1]*hbin,2) return hlim,vlim
_exposure_range=(50E-6,750E-3)
[docs] def get_exposure(self): return self._exposure
[docs] def set_exposure(self, exposure): exposure=sorted((exposure,)+self._exposure_range)[1] iexp=max(np.round(exposure/50E-6),1) self._exposure=iexp*50E-6 lib.NewClassicUSB_SetExposureTime(self.idx,iexp) return self.get_exposure()
[docs] def get_frame_timings(self): return self._TAcqTimings(self.get_exposure(),self.get_exposure())
_p_pixel_clock=interface.EnumParameterClass("pixel_clock",{"slow":0,"medium":1,"fast":2})
[docs] @interface.use_parameters(_return="pixel_clock") def get_pixel_clock(self): """Get pixel clock speed (``"slow"``, ``"medium"``, or ``"fast"``)""" return self._pixel_clock
[docs] @interface.use_parameters def set_pixel_clock(self, pixel_clock): """Set pixel clock speed (``"slow"``, ``"medium"``, or ``"fast"``)""" self._pixel_clock=pixel_clock lib.NewClassicUSB_SetSensorFrequency(self.idx,pixel_clock) return self.get_pixel_clock()
_p_hblanking=interface.EnumParameterClass("hblanking",{"normal":0,"longer":1,"longest":2})
[docs] @interface.use_parameters(_return="hblanking") def get_hblanking(self): """Get hblanking speed (``"normal"``, ``"longer"``, or ``"longest"``)""" return self._hblanking
[docs] @interface.use_parameters def set_hblanking(self, hblanking): """Set hblanking speed (``"normal"``, ``"longer"``, or ``"longest"``)""" self._hblanking=hblanking lib.NewClassicUSB_SetHBlankingExtension(self.idx,hblanking) return self.get_hblanking()
[docs] def send_software_trigger(self): """Send software trigger signal""" lib.NewClassicUSB_SoftTrigger(self.idx)
[docs] class ReceiveLooper: def __init__(self): self.comm=np.zeros(3,dtype="u8") self.stat=np.zeros(2,dtype="u8") self.buffer=None self.nbuff=None self.size=None self._make_callbacks() self._cb=None def _make_callbacks(self): puint64=nb.types.CPointer(nb.uint64) memmove=ctypes.memmove @nb.cfunc(nb.types.void(nb.uint64,puint64,puint64),nopython=True) def callback_body(psrc, pcomm, pstat): # comm is 2-array [pdst, nbuff, size], stat is 2-array [working, nread] comm=nb.carray(pcomm,3) stat=nb.carray(pstat,2) stat[0]=1 if comm[0]>0: b=stat[1]%comm[1] memmove(comm[0]+b*comm[2],psrc,comm[2]) stat[1]+=1 else: stat[1]=0 stat[0]=0 self._cb_body=callback_body _cb_body_ct=ctypes.WINFUNCTYPE(None,ctypes.c_uint64,ctypes.c_uint64,ctypes.c_uint64)(callback_body.address) pcomm_=self.comm.ctypes.data pstat_=self.stat.ctypes.data @nb.cfunc(nb.types.void(nb.uint64,nb.uint64),nopython=True) def callback(_, data): _cb_body_ct(data,pcomm_,pstat_) self._cb_main=callback
[docs] def enable_callback(self): """Register and enable the frame callback""" self.stat[1]=0 if self.buffer is not None: self._cb=lib.register_frame_callback(self._cb_main.address,wrap=False) self.comm[0]=ctypes.addressof(self.buffer) self.comm[1]=self.nbuff self.comm[2]=self.size return self.buffer is not None
[docs] def disable_callback(self): """Stop and deregister the frame callback""" lib.NewClassicUSB_InstallFrameHooker(0,0) self.comm[:]=0 while self.stat[0]: time.sleep(1E-3) self._cb=None
[docs] def is_looping(self): """Check if the loop is running""" return bool(self.comm[0])
[docs] def get_status(self): """Get the current loop status, which is the tuple ``(acquired,)``""" return (int(self.stat[1]),)
[docs] def allocate(self, nbuff, size): """Allocate given number of buffers of the given size""" self.buffer=ctypes.create_string_buffer(nbuff*size) self.nbuff=nbuff self.size=size
[docs] def deallocate(self): """Deallocate the buffers""" self.disable_callback() self.buffer=None self.nbuff=None self.size=None
def _allocate_buffers(self, nbuff): self._deallocate_buffers() shape=self._get_data_dimensions_rc() size=shape[0]*shape[1] self._looper.allocate(nbuff,size) def _deallocate_buffers(self): self._looper.deallocate()
[docs] @interface.use_parameters(mode="acq_mode") def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ """ Setup acquisition mode. `mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition). `nframes` sets up number of frame buffers. """ self.clear_acquisition() self._allocate_buffers(nbuff=nframes) super().setup_acquisition(mode=mode,nframes=nframes)
[docs] def clear_acquisition(self): self.stop_acquisition() self._deallocate_buffers() super().clear_acquisition()
[docs] def start_acquisition(self, *args, **kwargs): self.stop_acquisition() super().start_acquisition(*args,**kwargs) self._frame_counter.reset(self._acq_params["nframes"]) self._looper.enable_callback() nframes=0x8888 if self._acq_params["mode"]=="sequence" else self._acq_params["nframes"] lib.NewClassicUSB_StartFrameGrab(nframes)
[docs] def stop_acquisition(self): if self.acquisition_in_progress(): lib.NewClassicUSB_StopFrameGrab() self._looper.disable_callback() self._frame_counter.update_acquired_frames(self._get_acquired_frames()) super().stop_acquisition()
[docs] def acquisition_in_progress(self): return self._looper.is_looping()
def _get_acquired_frames(self): return self._looper.get_status()[0] _support_chunks=True def _read_frames(self, rng, return_info=False): shape=self._get_data_dimensions_rc() nbuff=self._looper.nbuff size=self._looper.size data=np.ctypeslib.as_array(ctypes.cast(self._looper.buffer,ctypes.POINTER(ctypes.c_ubyte)),shape=(nbuff,size)) i0,i1=rng if (i1-1)//nbuff==i0//nbuff: chunks=[(i0,i1-i0)] else: cut=(i1//nbuff)*nbuff chunks=[(i0,cut-i0),(cut,i1-cut)] frames=[data[b%nbuff:b%nbuff+n,:].reshape((n,)+shape).copy() for b,n in chunks] return frames,None