Source code for pylablib.devices.DCAM.DCAM

from . import dcamapi4_lib, dcamprop_defs
from .dcamapi4_lib import wlib as lib, DCAMError, DCAMLibError

from ...core.devio import interface
from ...core.utils import py3, general
from ..interface import camera
from ..utils import load_lib

import numpy as np
import collections
import ctypes
import warnings


class DCAMTimeoutError(DCAMError):
    "DCAM frame timeout error"


[docs] class LibraryController(load_lib.LibraryController): def __init__(self, lib): # pylint: disable=redefined-outer-name super().__init__(lib) self.cameras=0 def _do_init(self): self.cameras=self.lib.dcamapi_init() def _do_uninit(self): self.lib.dcamapi_uninit() self.cameras=None
libctl=LibraryController(lib)
[docs] def restart_lib(): libctl.shutdown()
[docs] def get_cameras_number(): """Get number of connected DCAM cameras""" try: with libctl.temp_open(): return libctl.cameras except DCAMError: return 0
[docs] class DCAMAttribute: """ DCAM camera attribute. Allows to query and set values and get additional information. Usually created automatically by a DCAM camera instance, but could also be created manually. Args: handle: DCAM camera handle pid: attribute id Attributes: name: attribute name kind (str): attribute kind; can be ``"int"``, ``"float"``, ``"enum"``, or ``"none"`` (can't determine) readable (bool): whether attribute is readable writable (bool): whether attribute is writable min (float): minimal attribute value (if applicable) max (float): maximal attribute value (if applicable) step (float): attribute value step (if applicable) unit (int): attribute units (index value) ivalues: list of possible integer values for enum attributes values: list of possible text values for enum attributes labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute """ def __init__(self, handle, pid): self.handle=handle self.pid=pid self.name=py3.as_str(lib.dcamprop_getname(self.handle,pid)) props=lib.dcamprop_getattr(self.handle,pid) self.min=props.valuemin self.max=props.valuemax self.step=props.valuestep self.default=props.valuedefault self.unit=props.iUnit self._attributes_n=props.attribute%0x100000000 self.readable=bool(self._attributes_n&dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_ATTR_READABLE) self.writable=bool(self._attributes_n&dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_ATTR_WRITABLE) kinds={dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_TYPE_LONG:"int",dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_TYPE_REAL:"float",dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_TYPE_MODE:"enum"} self.kind=kinds.get(self._attributes_n&dcamprop_defs.DCAMPROPATTRIBUTE.DCAMPROP_TYPE_MASK,"none") self.values=[] self.ivalues=[] self.labels={} self.ilabels={} if self.kind=="enum": self._update_enum_limits() if self.kind in ["enum","int"]: self.min=int(self.min) self.max=int(self.max) self.step=int(self.step) self.default=int(self.default)
[docs] def as_text(self, value=None): """Get the given attribute value as text (by default, current value)""" if value is None: return self.get_value(enum_as_str=True) return py3.as_str(lib.dcamprop_getvaluetext(self.handle,self.pid,value))
def _update_enum_limits(self): self.values=[] self.ivalues=[] for v in range(int(self.min),int(self.max)+1): try: tv=self.as_text(v) self.values.append(tv) self.ivalues.append(v) except DCAMLibError: pass self.labels=dict(zip(self.values,self.ivalues)) self.ilabels=dict(zip(self.ivalues,self.values))
[docs] def update_limits(self): """Update minimal and maximal attribute limits and return tuple ``(min, max)``""" props=lib.dcamprop_getattr(self.handle,self.pid) self.min=props.valuemin self.max=props.valuemax if self.kind=="enum": self._update_enum_limits() return (self.min,self.max)
[docs] def get_value(self, enum_as_str=False): """ Get current attribute value. If ``enum_as_str==True``, try to represent enums as their string values; otherwise, return their integer values (only integers can be used for setting). """ value=lib.dcamprop_getvalue(self.handle,self.pid) if self.kind in ["enum","int"]: value=int(value) if enum_as_str: try: return self.as_text(value) except DCAMLibError: pass return value
[docs] def set_value(self, value): """Set attribute value""" return lib.dcamprop_setgetvalue(self.handle,self.pid,value)
def __repr__(self): return "{}(name='{}', kind={}, id={}, min={}, max={}, unit={})".format(self.__class__.__name__,self.name,self.kind,self.pid,self.min,self.max,self.unit)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["vendor","model","serial_number","camera_version"]) TFrameInfo=collections.namedtuple("TFrameInfo",["frame_index","framestamp","timestamp_us","camerastamp","position","pixeltype"])
[docs] class DCAMCamera(camera.IBinROICamera, camera.IExposureCamera, camera.IAttributeCamera): Error=DCAMError TimeoutError=DCAMTimeoutError _TFrameInfo=TFrameInfo _frameinfo_fields=general.make_flat_namedtuple(TFrameInfo,fields={"position":camera.TFramePosition})._fields def __init__(self, idx=0): super().__init__() self.idx=idx self.handle=None self._opid=None self.dcamwait=None self._alloc_nframes=0 self._readout_speeds={} self._trigger_modes={} self.open() self._add_camera_parameters() self._add_info_variable("device_info",self.get_device_info) self._add_settings_variable("trigger_mode",self.get_trigger_mode,self.set_trigger_mode,ignore_error=ValueError) self._add_info_variable("all_trigger_modes",self.get_all_trigger_modes) self._add_settings_variable("ext_trigger",self.get_ext_trigger_parameters,self.setup_ext_trigger) self._add_settings_variable("readout_speed",self.get_readout_speed,self.set_readout_speed,ignore_error=ValueError) self._add_info_variable("all_readout_speeds",self.get_all_readout_speeds) self._add_settings_variable("defect_correct_mode",self.get_defect_correct_mode,self.set_defect_correct_mode,ignore_error=ValueError) self._add_status_variable("readout_time",self.get_frame_readout_time) self._add_status_variable("acq_status",self.get_status) self._add_status_variable("transfer_info",self.get_transfer_info) self._update_device_variable_order("exposure") self._add_status_variable("frame_period",self.get_frame_period) def _get_connection_parameters(self): return self.idx
[docs] def open(self): """Open connection to the camera""" if self.handle is not None: return with libctl.temp_open(): ncams=get_cameras_number() if self.idx>=ncams: raise DCAMError("camera index {} is not available ({} cameras exist)".format(self.idx,ncams)) try: self.handle=lib.dcamdev_open(self.idx) self._opid=libctl.open().opid self.dcamwait=lib.dcamwait_open(self.handle) self._update_attributes() self._valid_binnings=self._get_valid_binnings() except DCAMError: self.close() raise
[docs] def close(self): """Close connection to the camera""" if self.handle: self.clear_acquisition() lib.dcamwait_close(self.dcamwait.hwait) lib.dcamdev_close(self.handle) libctl.close(self._opid) self.handle=None self._opid=None
[docs] def is_opened(self): """Check if the device is connected""" return self.handle is not None
def _add_camera_parameters(self): rsprop=self.get_attribute("READOUT SPEED",error_on_missing=False) rspar=interface.RangeParameterClass("readout_speed",1,None) if rsprop is not None: if rsprop.max==1: self._readout_speeds={"fast":1} if rsprop.max==2: self._readout_speeds={"slow":1,"fast":2} elif rsprop.max==3: self._readout_speeds={"slow":1,"normal":2,"fast":3} rspar=interface.EnumParameterClass("readout_speed",self._readout_speeds) self._add_parameter_class(rspar) tsprop=self.get_attribute("TRIGGER SOURCE",error_on_missing=False) tspar=interface.RangeParameterClass("trigger_mode",1,None) if tsprop is not None and tsprop.max<=4: self._trigger_modes={"int":1,"ext":2,"software":3,"master_pulse":4} tspar=interface.EnumParameterClass("trigger_mode",self._trigger_modes) self._add_parameter_class(tspar)
[docs] def get_device_info(self): """ Get camera model data. Return tuple ``(vendor, model, serial_number, camera_version)``. """ vendor=py3.as_str(lib.dcamdev_getstring(self.handle,dcamapi4_lib.DCAM_IDSTR.DCAM_IDSTR_VENDOR)) model=py3.as_str(lib.dcamdev_getstring(self.handle,dcamapi4_lib.DCAM_IDSTR.DCAM_IDSTR_MODEL)) serial_number=py3.as_str(lib.dcamdev_getstring(self.handle,dcamapi4_lib.DCAM_IDSTR.DCAM_IDSTR_CAMERAID)) camera_version=py3.as_str(lib.dcamdev_getstring(self.handle,dcamapi4_lib.DCAM_IDSTR.DCAM_IDSTR_CAMERAVERSION)) return TDeviceInfo(vendor,model,serial_number,camera_version)
def _list_attributes(self): ids=lib.dcamprop_getallids(self.handle,0) return [DCAMAttribute(self.handle,pid) for pid in ids] def _normalize_attribute_name(self, name): return name.lower().replace(" ","_")
[docs] def get_attribute_value(self, name, enum_as_str=False, error_on_missing=True, default=None): # pylint: disable=arguments-differ """ Get value of an attribute with the given name. If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`. If `default` is not ``None``, assume that ``error_on_missing==False``. If ``enum_as_str==True``, try to represent enums as their string values; otherwise, return their integer values (only integers can be used for setting). """ return super().get_attribute_value(name,enum_as_str=enum_as_str,error_on_missing=error_on_missing,default=default)
[docs] def set_attribute_value(self, name, value, error_on_missing=True): # pylint: disable=arguments-differ """ Set value of an attribute with the given name. If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing. """ return super().set_attribute_value(name,value,error_on_missing=error_on_missing)
[docs] def get_all_attribute_values(self, root="", enum_as_str=False): # pylint: disable=arguments-differ, arguments-renamed """ Get values of all attributes. If ``enum_as_str==True``, try to represent enums as their string values; otherwise, return their integer values (only integers can be used for setting). """ return super().get_all_attribute_values(root=root,enum_as_str=enum_as_str)
[docs] def set_all_attribute_values(self, settings): # pylint: disable=arguments-differ """Set values of all attribute in the given dictionary""" return super().set_all_attribute_values(settings)
[docs] @camera.acqstopped @interface.use_parameters(mode="trigger_mode") def set_trigger_mode(self, mode): """ Set trigger mode. Can be ``"int"`` (internal), ``"ext"`` (external), or ``"software"`` (software trigger). """ self.cav["TRIGGER SOURCE"]=mode return self.get_trigger_mode()
[docs] @interface.use_parameters(_returns="trigger_mode") def get_trigger_mode(self): """ Get trigger mode. Can be ``"int"`` (internal), ``"ext"`` (external), or ``"software"`` (software trigger). """ return int(self.cav["TRIGGER SOURCE"])
[docs] def get_all_trigger_modes(self): """Return the list of all available trigger modes""" return list(self._trigger_modes)
[docs] def setup_ext_trigger(self, invert=False, delay=0.): """Setup external trigger (inversion and delay)""" self.cav["TRIGGER POLARITY"]=2 if invert else 1 self.set_attribute_value("TRIGGER DELAY",delay,error_on_missing=False) return self.get_ext_trigger_parameters()
[docs] def get_ext_trigger_parameters(self): """Return external trigger parameters (inversion and delay)""" invert=self.cav["TRIGGER POLARITY"]==2 delay=self.get_attribute_value("TRIGGER DELAY",default=0) return invert,delay
[docs] def send_software_trigger(self): """Send software trigger signal""" lib.dcamcap_firetrigger(self.handle)
[docs] def set_exposure(self, exposure): """Set camera exposure""" self.cav["EXPOSURE TIME"]=exposure return self.get_exposure()
[docs] def get_exposure(self): """Set current exposure""" return self.cav["EXPOSURE TIME"]
[docs] @camera.acqcleared @interface.use_parameters(speed="readout_speed") def set_readout_speed(self, speed="fast"): """Set readout speed (can be ``"fast"`` or ``"slow"``)""" self.set_attribute_value("READOUT SPEED",speed,error_on_missing=False) return self.get_readout_speed()
[docs] @interface.use_parameters(_returns="readout_speed") def get_readout_speed(self): """Set current readout speed""" return self.get_attribute_value("READOUT SPEED",default=1)
[docs] def get_all_readout_speeds(self): """Return the list of all available readout speeds""" return list(self._readout_speeds)
[docs] def get_frame_readout_time(self): """Set current frame readout time""" return self.cav["TIMING READOUT TIME"]
[docs] def get_frame_timings(self): """ Get acquisition timing. Return tuple ``(exposure, frame_period)``. """ exposure=self.get_exposure() return self._TAcqTimings(exposure,max(exposure,self.get_frame_readout_time()))
[docs] def get_defect_correct_mode(self): """Check if the defect pixel correction mode is on""" return self.get_attribute_value("DEFECT CORRECT MODE",default=1)==2
[docs] @camera.acqstopped def set_defect_correct_mode(self, enabled=True): """Enable or disable the defect pixel correction mode""" self.set_attribute_value("DEFECT CORRECT MODE",2 if enabled else 1,error_on_missing=False) return self.get_defect_correct_mode()
def _allocate_buffer(self, nframes): self._deallocate_buffer() if nframes: lib.dcambuf_alloc(self.handle,nframes) self._alloc_nframes=nframes def _deallocate_buffer(self): if self._alloc_nframes: lib.dcambuf_release(self.handle,0) self._alloc_nframes=0 def _read_buffer(self, buffer): return lib.dcambuf_lockframe(self.handle,buffer) def _buffer_to_array(self, buffer): # TODO: different packing / color modes (generic for all cameras) bpp=int(buffer.bpp) if bpp==1: ct=ctypes.c_uint8*buffer.btot elif bpp==2: ct=ctypes.c_uint16*(buffer.btot//2) elif bpp==4: ct=ctypes.c_uint32*(buffer.btot//4) else: raise DCAMError("can't convert data with {} BBP into an array".format(bpp)) data=ct.from_address(buffer.buf) img=np.array(data).reshape((buffer.height,buffer.width)) return self._convert_indexing(img,"rct") def _get_data_dimensions_rc(self): return int(self.cav["IMAGE HEIGHT"]),int(self.cav["IMAGE WIDTH"])
[docs] def get_detector_size(self): """Get camera detector size (in pixels) as a tuple ``(width, height)``""" return (int(self.get_attribute("SUBARRAY HSIZE").max),int(self.get_attribute("SUBARRAY VSIZE").max))
[docs] def get_roi(self): """ Get current ROI. Return tuple ``(hstart, hend, vstart, vend, hbin, vbin)``. """ hstart=int(self.cav["SUBARRAY HPOS"]) hend=hstart+int(self.cav["SUBARRAY HSIZE"]) vstart=int(self.cav["SUBARRAY VPOS"]) vend=vstart+int(self.cav["SUBARRAY VSIZE"]) hvbin=int(self.cav["BINNING"]) return (hstart,hend,vstart,vend,hvbin,hvbin)
def _get_valid_binnings(self): bmax=min(self.get_detector_size()) valid_bins=[] p=self.get_attribute("BINNING") for b in range(1,bmax+1): try: p.as_text(b) valid_bins.append(b) except DCAMLibError: if b>4 and b>valid_bins[-1]*4: break return valid_bins def _truncate_roi_binning(self, binv): return max([b for b in self._valid_binnings if b<=binv]) if binv>=1 else 1
[docs] @camera.acqcleared def set_roi(self, hstart=0, hend=None, vstart=0, vend=None, hbin=1, vbin=1): """ Set current ROI. By default, all non-supplied parameters take extreme values. Binning is the same for both axes, so value of `vbin` is ignored (it is left for compatibility). """ self.cav["SUBARRAY MODE"]=2 hlim,vlim=self.get_roi_limits() hbin=self._truncate_roi_binning(hbin) hstart,hend,hbin=self._truncate_roi_axis((hstart,hend,hbin),hlim) vstart,vend,vbin=self._truncate_roi_axis((vstart,vend,hbin),vlim) chstart,_,cvstart,_=self.get_roi()[:4] self.cav["BINNING"]=1 if hstart<=chstart: self.cav["SUBARRAY HPOS"]=hstart self.cav["SUBARRAY HSIZE"]=hend-hstart else: self.cav["SUBARRAY HSIZE"]=hend-hstart self.cav["SUBARRAY HPOS"]=hstart if vstart<=cvstart: self.cav["SUBARRAY VPOS"]=vstart self.cav["SUBARRAY VSIZE"]=vend-vstart else: self.cav["SUBARRAY VSIZE"]=vend-vstart self.cav["SUBARRAY VPOS"]=vstart self.cav["BINNING"]=hbin return self.get_roi()
[docs] def get_roi_limits(self, hbin=1, vbin=1): params=[self.ca[p] for p in ["SUBARRAY HPOS","SUBARRAY VPOS","SUBARRAY HSIZE","SUBARRAY VSIZE"]] minp=tuple([int(p.min) for p in params]) maxp=tuple([int(p.max) for p in params]) stepp=tuple([int(p.step) for p in params]) hlim=camera.TAxisROILimit(minp[2],maxp[2],stepp[0],stepp[2],self._valid_binnings[-1]) vlim=camera.TAxisROILimit(minp[3],maxp[3],stepp[1],stepp[3],self._valid_binnings[-1]) return hlim,vlim
[docs] @interface.use_parameters(mode="acq_mode") def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ """ Setup acquisition. `mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition). `nframes` determines number of frames to acquire in the single mode, or size of the ring buffer in the ``"sequence"`` mode (by default, 100). """ super().setup_acquisition(mode=mode,nframes=nframes) if self._acq_params["nframes"]!=self._alloc_nframes: self.stop_acquisition() self._allocate_buffer(nframes)
[docs] def clear_acquisition(self): self.stop_acquisition() self._deallocate_buffer() super().clear_acquisition()
[docs] def start_acquisition(self, *args, **kwargs): self.stop_acquisition() super().start_acquisition(*args,**kwargs) lib.dcamcap_start(self.handle,-1 if self._acq_params["mode"]=="sequence" else 0) self._frame_counter.reset(self._alloc_nframes)
[docs] def stop_acquisition(self): if self.acquisition_in_progress(): self._frame_counter.update_acquired_frames(self._get_acquired_frames()) lib.dcamcap_stop(self.handle)
_p_acq_status=interface.EnumParameterClass("acq_status",{ "error":dcamapi4_lib.DCAMCAP_STATUS.DCAMCAP_STATUS_ERROR, "busy":dcamapi4_lib.DCAMCAP_STATUS.DCAMCAP_STATUS_BUSY, "ready":dcamapi4_lib.DCAMCAP_STATUS.DCAMCAP_STATUS_READY, "stable":dcamapi4_lib.DCAMCAP_STATUS.DCAMCAP_STATUS_STABLE, "unstable":dcamapi4_lib.DCAMCAP_STATUS.DCAMCAP_STATUS_UNSTABLE, })
[docs] @interface.use_parameters(_returns="acq_status") def get_status(self): """ Get acquisition status. Can be ``"busy"`` (capturing in progress), ``"ready"`` (ready for capturing), ``"stable"`` (not prepared for capturing), ``"unstable"`` (can't be prepared for capturing), or ``"error"`` (some other error). """ return lib.dcamcap_status(self.handle)
[docs] def acquisition_in_progress(self): return self.get_status()=="busy"
[docs] def get_transfer_info(self): """ Get frame transfer info. Return tuple ``(last_buff, frame_count)``, where ``last_buff`` is the index of the last filled buffer, and ``frame_count`` is the total number of acquired frames. """ return tuple(lib.dcamcap_transferinfo(self.handle,0))
def _get_acquired_frames(self): return self.get_transfer_info()[1] def _wait_for_next_frame(self, timeout=20., idx=None): if timeout is None or timeout>0.1: timeout=0.1 try: eventmask=dcamapi4_lib.DCAMWAIT_EVENT.DCAMWAIT_CAPEVENT_FRAMEREADY lib.dcamwait_start(self.dcamwait.hwait,eventmask,int(timeout*1000)) return except DCAMLibError as e: if e.code==dcamapi4_lib.DCAMERR.DCAMERR_LOSTFRAME: # occasionally comes up; warnings.warn("lost DCAM frame") elif e.code!=dcamapi4_lib.DCAMERR.DCAMERR_TIMEOUT: raise def _frame_info_to_namedtuple(self, info): return self._TFrameInfo(info[0],info[1],info[2],info[3],camera.TFramePosition(info[4:6]),info[6]) def _get_single_frame(self, buffer): """ Get a frame at the given buffer index. If ``return_info==True``, return tuple ``(data, info)``, where info is the :class:`TFrameInfo` instance describing frame index, timestamp, camera stamp, frame location on the sensor, and pixel type. Does not advance the read frames counter. """ sframe=self._read_buffer(buffer%self._alloc_nframes) position=camera.TFramePosition(sframe.left,sframe.top) info=TFrameInfo(buffer,sframe.framestamp,sframe.timestamp[0]*10**6+sframe.timestamp[1],sframe.camerastamp,position,sframe.type) data=self._buffer_to_array(sframe) return data,info def _read_frames(self, rng, return_info=False): data=[self._get_single_frame(n) for n in range(rng[0],rng[1])] return [d[0] for d in data],[d[1] for d in data] def _zero_frame(self, n): dim=self.get_data_dimensions() bpp=int(self.get_attribute_value("BIT PER CHANNEL",default=8)) dt="<u{}".format((bpp-1)//8+1) return np.zeros((n,)+dim,dtype=dt)
[docs] def read_multiple_images(self, rng=None, peek=False, missing_frame="skip", return_info=False, return_rng=False): """ Read multiple images specified by `rng` (by default, all un-read images). If `rng` is specified, it is a tuple ``(first, last)`` with images range (first inclusive). If no new frames are available, return an empty list; if no acquisition is running, return ``None``. If ``peek==True``, return images but not mark them as read. `missing_frame` determines what to do with frames which are out of range (missing or lost): can be ``"none"`` (replacing them with ``None``), ``"zero"`` (replacing them with zero-filled frame), or ``"skip"`` (skipping them). If ``return_info==True``, return tuple ``(frames, infos)``, where ``infos`` is a list of :class:`TFrameInfo` instances describing frame index, framestamp and timestamp, camera stamp, frame location on the sensor, and pixel type; if some frames are missing and ``missing_frame!="skip"``, the corresponding frame info is ``None``. if ``return_rng==True``, return the range covered resulting frames; if ``missing_frame=="skip"``, the range can be smaller than the supplied `rng` if some frames are skipped. """ return super().read_multiple_images(rng=rng,peek=peek,missing_frame=missing_frame,return_info=return_info,return_rng=return_rng)