Source code for pylablib.devices.IMAQdx.IMAQdx

from pylablib.core.utils import funcargparse
from . import NIIMAQdx_lib
from .NIIMAQdx_lib import IMAQdxAttributeType, IMAQdxAttributeVisibility, IMAQdxCameraControlMode, IMAQdxBufferNumberMode
from .NIIMAQdx_lib import wlib as lib, IMAQdxError, IMAQdxLibError  # pylint: disable=unused-import

from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera

import numpy as np
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
    from .utils import get_callback  # pylint: disable=no-name-in-module
import ctypes
import collections
import time


class IMAQdxTimeoutError(IMAQdxError):
    "IMAQdx frame timeout error"







TCameraInfo=collections.namedtuple("TCameraInfo",["name","type","version","flags","serial_number","bus","vendor","model","camera_file","attr_url"])
def _parse_camera_info(info):
    name=py3.as_str(info.InterfaceName)
    ctype=info.Type
    version=info.Version
    flags=info.Flags
    serial_number="{:016X}".format((info.SerialNumberHi<<32)+info.SerialNumberLo)
    try:
        bus=NIIMAQdx_lib.IMAQdxBusType(info.BusType).name[len("IMAQdxBusType"):]
    except ValueError:
        bus=info.BusType
    vendor=py3.as_str(info.VendorName)
    model=py3.as_str(info.ModelName)
    camera_file=py3.as_str(info.CameraFileName)
    attr_url=py3.as_str(info.CameraAttributeURL)
    return TCameraInfo(name,ctype,version,flags,serial_number,bus,vendor,model,camera_file,attr_url)
[docs] def list_cameras(connected=True, desc=True): """ List all cameras available through IMAQdx interface If ``desc==True``, return complete camera descriptions; otherwise, simply return the names. """ lib.initlib() cams=lib.IMAQdxEnumerateCameras(connected) infos=[_parse_camera_info(c) for c in cams] return infos if desc else [inf.name for inf in infos]
[docs] def get_cameras_number(): """Get number of connected dx cameras""" return len(list_cameras())
[docs] class IMAQdxAttribute: """ Object representing an IMAQdx camera parameter. Allows to query and set values and get additional information. Usually created automatically by an :class:`IMAQdxCamera` instance, but could be created manually. Args: sid: camera session ID name: attribute text name Attributes: name: attribute name kind: attribute kind; can be ``"u32"``, ``"i64"``, ``"f64"``, ``"str"``, ``"enum"``, ``"bool"``, ``"command"``, or ``"blob"`` display_name: attribute display name (short description name) tooltip: longer attribute description description: full attribute description (usually, same as `tooltip`) units: attribute units (if applicable) visibility: attribute visibility (``"simple"``, ``"intermediate"``, or ``"advanced"``) readable (bool): whether attribute is readable writable (bool): whether attribute is writable min (float or int): minimal attribute value (if applicable) max (float or int): maximal attribute value (if applicable) inc (float or int): minimal attribute increment value (if applicable) ivalues: list of possible integer values for enum attributes values: list of possible text values for enum attributes labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute """ _attr_types={ IMAQdxAttributeType.IMAQdxAttributeTypeU32:"u32", IMAQdxAttributeType.IMAQdxAttributeTypeI64:"i64", IMAQdxAttributeType.IMAQdxAttributeTypeF64:"f64", IMAQdxAttributeType.IMAQdxAttributeTypeString:"str", IMAQdxAttributeType.IMAQdxAttributeTypeEnum:"enum", IMAQdxAttributeType.IMAQdxAttributeTypeBool:"bool", IMAQdxAttributeType.IMAQdxAttributeTypeCommand:"command", IMAQdxAttributeType.IMAQdxAttributeTypeBlob:"blob"} _vis_types={ IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple:"simple", IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate:"intermediate", IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced:"advanced"} def __init__(self, sid, name): self.sid=sid self.name=py3.as_str(name) self.display_name=py3.as_str(lib.IMAQdxGetAttributeDisplayName(sid,name)) self.tooltip=py3.as_str(lib.IMAQdxGetAttributeTooltip(sid,name)) self.description=py3.as_str(lib.IMAQdxGetAttributeDescription(sid,name)) self.visibility=self._vis_types[lib.IMAQdxGetAttributeVisibility(sid,name)] self.units=py3.as_str(lib.IMAQdxGetAttributeUnits(sid,name)) self.readable=lib.IMAQdxIsAttributeReadable(sid,name) self.writable=lib.IMAQdxIsAttributeWritable(sid,name) self._attr_type_n=lib.IMAQdxGetAttributeType(sid,name) self.kind=self._attr_types[self._attr_type_n] self.min=None self.max=None self.inc=None self.values=[] self.ivalues=[] self.labels={} self.ilabels={} self.update_limits()
[docs] def update_limits(self): """Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``""" if self._attr_type_n in lib.numeric_attr_types: self.min=lib.IMAQdxGetAttributeMinimum(self.sid,self.name,self._attr_type_n) self.max=lib.IMAQdxGetAttributeMaximum(self.sid,self.name,self._attr_type_n) self.inc=lib.IMAQdxGetAttributeIncrement(self.sid,self.name,self._attr_type_n) return (self.min,self.max,self.inc) if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum: attr_values=lib.IMAQdxEnumerateAttributeValues(self.sid,self.name) self.values=[av.Name for av in attr_values] self.ivalues=[av.Value for av in attr_values] self.labels=dict(zip(self.values,self.ivalues)) self.ilabels=dict(zip(self.ivalues,self.values))
[docs] def truncate_value(self, value): """Truncate value to lie within attribute limits""" self.update_limits() if self._attr_type_n in lib.numeric_attr_types: if value<self.min: value=self.min elif value>self.max: value=self.max else: inc=self.inc if inc>0: value=((value-self.min)//inc)*inc+self.min return value
[docs] def get_value(self, enum_as_str=True): """ Get attribute value. If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values. """ if not self.readable: raise IMAQdxError("Attribute {} is not readable".format(self.name)) val=lib.IMAQdxGetAttribute(self.sid,self.name,self._attr_type_n) if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum: val=val.Name if enum_as_str else val.Value return val
[docs] def set_value(self, value, truncate=True): """ Get attribute value. If ``truncate==True``, automatically truncate value to lie within allowed range. """ if not self.writable: raise IMAQdxError("Attribute {} is not writable".format(self.name)) if truncate: value=self.truncate_value(value) return lib.IMAQdxSetAttribute(self.sid,self.name,value,None)
def __repr__(self): return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["vendor","model","serial_number","bus_type"])
[docs] class IMAQdxCamera(camera.IROICamera, camera.IAttributeCamera): """ Generic IMAQdx camera interface. Args: name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``) mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading) visibility: attribute visibility when listing attributes; can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes). """ Error=IMAQdxError TimeoutError=IMAQdxTimeoutError def __init__(self, name="cam0", mode="controller", visibility="advanced"): super().__init__() lib.initlib() self.name=name self.mode=mode self.visibility=visibility self.sid=None self._cb_manager=self.CallbackManager() self.open() self._raw_readout_format=False self._raw_readout_format_bypp=None self._raw_readout_format_bypi=None self._add_info_variable("device_info",self.get_device_info) _p_connection_mode=interface.EnumParameterClass("connection_mode",{ "controller":IMAQdxCameraControlMode.IMAQdxCameraControlModeController, "listener":IMAQdxCameraControlMode.IMAQdxCameraControlModeListener}) _p_visibility=interface.EnumParameterClass("visibility",{ "simple":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple, "intermediate":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate, "advanced":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced}) def _get_connection_parameters(self): return self.name
[docs] def open(self): """Open connection to the camera""" if self.sid is not None: return mode=self._p_connection_mode(self.mode) self.sid=lib.IMAQdxOpenCamera(self.name,mode) with self._close_on_error(): self._update_attributes() self.post_open()
[docs] def close(self): """Close connection to the camera""" if self.sid is not None: try: self.clear_acquisition() finally: lib.IMAQdxCloseCamera(self.sid) self.sid=None
[docs] def reset(self): """Reset connection to the camera""" self.close() lib.IMAQdxResetCamera(self.name,False) self.open()
[docs] def is_opened(self): """Check if the device is connected""" return self.sid is not None
[docs] def post_open(self): """Additional setup after camera opening""" att=self.get_attribute("PixelFormat",error_on_missing=False) if att and att.writable: att.set_value(att.get_value()) # there seems to be occasional de-synchronization between the read and the actual value
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize","StatusInformation::LastBufferNumber","AcquisitionAttributes::BitsPerPixel"] def _normalize_attribute_name(self, name): return name.replace("::","/") def _list_attributes(self): visibility=self._p_visibility(self.visibility) attrs=lib.IMAQdxEnumerateAttributes2(self.sid,"",visibility) attr_names=[a.Name for a in attrs] for a in self._builtin_attrs: if a not in attr_names: try: lib.IMAQdxGetAttributeDisplayName(self.sid,a) attr_names.append(a) except IMAQdxError: pass return [IMAQdxAttribute(self.sid,a) for a in attr_names]
[docs] def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ """ Get value of an attribute with the given name. If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`. If `default` is not ``None``, assume that ``error_on_missing==False``. If `name` points at a dictionary branch, return a dictionary with all values in this branch. If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values. """ return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs] def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ """ Set value of an attribute with the given name. If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing. If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary). If ``truncate==True``, truncate value to lie within attribute range. """ return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs] def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ """Get values of all attributes with the given `root`""" values=dictionary.Dictionary() for n,att in self.get_attribute(root).as_dict("flat").items(): if att.readable: try: values[n]=att.get_value(enum_as_str=enum_as_str) except IMAQdxLibError: # sometimes nominally implemented features still raise errors if not ignore_errors: raise return values
[docs] def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ """ Set values of all attributes with the given `root`. If ``truncate==True``, truncate value to lie within attribute range. """ return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs] def get_device_info(self): """ Get camera information. Return tuple ``(vendor, model, serial_number, bus_type)``. """ cam_info=self.cav["CameraInformation"] serial="{:016X}".format((cam_info["SerialNumberHigh"]<<32)+cam_info["SerialNumberLow"]) return TDeviceInfo(cam_info["VendorName"],cam_info["ModelName"],serial,cam_info["BusType"])
def _get_data_dimensions_rc(self): return self.cav["Height"],self.cav["Width"]
[docs] def get_detector_size(self): """Get camera detector size (in pixels) as a tuple ``(width, height)``""" return self.ca["Width"].max,self.ca["Height"].max
[docs] def get_roi(self): ox=self.get_attribute_value("OffsetX",default=0) oy=self.get_attribute_value("OffsetY",default=0) w=self.cav["Width"] h=self.cav["Height"] return ox,ox+w,oy,oy+h
[docs] @camera.acqcleared def set_roi(self, hstart=0, hend=None, vstart=0, vend=None): for a in ["Width","Height","OffsetX","OffsetY"]: if a not in self.ca or not self.ca[a].writable: return self.get_roi() det_size=self.get_detector_size() if hend is None: hend=det_size[0] if vend is None: vend=det_size[1] with self.pausing_acquisition(): self.cav["Width"]=self.ca["Width"].min self.cav["Height"]=self.ca["Height"].min self.cav["OffsetX"]=hstart self.cav["OffsetY"]=vstart self.cav["Width"]=max(self.cav["Width"],hend-hstart) self.cav["Height"]=max(self.cav["Height"],vend-vstart) return self.get_roi()
[docs] def get_roi_limits(self, hbin=1, vbin=1): params=["Width","Height","OffsetX","OffsetY"] minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params]) maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params]) incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params]) hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0],incp[2] or maxp[0],incp[0] or maxp[0],1) vlim=camera.TAxisROILimit(minp[1] or maxp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1) return hlim,vlim
[docs] class CallbackManager: def __init__(self): self.commdata=(ctypes.c_uint32*2)() self.callback=get_callback()
[docs] def get_callback_ptr(self): return self.callback.address
[docs] def register(self, sid): self.reset() self.start() lib.IMAQdxRegisterFrameDoneEvent(sid,1,self.callback,ctypes.addressof(self.commdata),wrap=False)
[docs] def reset(self): self.commdata[1]=0
[docs] def start(self): self.commdata[0]=1
[docs] def stop(self): self.commdata[0]=0
[docs] def get_nbuff(self): return int(self.commdata[1])
[docs] @interface.use_parameters(mode="acq_mode") def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ """ Setup acquisition mode. `mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition). (note that :meth:`.IMAQdxCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired). `nframes` sets up number of frame buffers. """ self.clear_acquisition() super().setup_acquisition(mode=mode,nframes=nframes)
[docs] def clear_acquisition(self): self.stop_acquisition() super().clear_acquisition()
[docs] def start_acquisition(self, *args, **kwargs): self.stop_acquisition() super().start_acquisition(*args,**kwargs) self._frame_counter.reset(self._acq_params["nframes"]) lib.IMAQdxConfigureAcquisition(self.sid,self._acq_params["mode"]=="sequence",self._acq_params["nframes"]) self._cb_manager.register(self.sid) lib.IMAQdxStartAcquisition(self.sid)
[docs] def stop_acquisition(self): if self.acquisition_in_progress(): self._frame_counter.update_acquired_frames(self._get_acquired_frames()) lib.IMAQdxStopAcquisition(self.sid) self._cb_manager.stop() lib.IMAQdxUnconfigureAcquisition(self.sid) super().stop_acquisition()
[docs] def acquisition_in_progress(self): """Check if acquisition is in progress""" return self.cav["StatusInformation/AcqInProgress"]
[docs] def refresh_acquisition(self, delay=0.005): """Stop and restart the acquisition, waiting `delay` seconds in between""" self.stop_acquisition() self.clear_acquisition() self.setup_acquisition(0,1) self.start_acquisition() time.sleep(delay) self.stop_acquisition() self.clear_acquisition()
def _get_acquired_frames(self): # last_buffer=self.cav["StatusInformation/LastBufferNumber"] # if last_buffer>=2**31: # last_buffer=-1 # return last_buffer+1 return self._cb_manager.get_nbuff() def _read_data_raw(self, buffer_num, size_bytes, dtype="<u1", mode=IMAQdxBufferNumberMode.IMAQdxBufferNumberModeBufferNumber): """Return raw bytes string from the given buffer number""" dtype=np.dtype(dtype) if size_bytes%dtype.itemsize: raise IMAQdxError("specified buffer size {} is not divisible by the element size {}".format(size_bytes,dtype.itemsize)) arr=np.empty(size_bytes//dtype.itemsize,dtype) lib.IMAQdxGetImageData(self.sid,arr.ctypes.data,size_bytes,mode,buffer_num) return arr def _parse_data(self, data, shape, pixel_format): if self._raw_readout_format=="frame": return data if self._raw_readout_format=="rows": return data.reshape((shape[0],-1)) supported_formats=["Mono8","Mono10","Mono12","Mono16","Mono32"] if pixel_format not in supported_formats: sf_string=", ".join(supported_formats) raise IMAQdxError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string)) if pixel_format=="Mono8": return data.reshape(shape) elif pixel_format in ["Mono10","Mono12","Mono16"]: return data.view("<u2").reshape(shape) else: return data.view("<u4").reshape(shape)
[docs] def enable_raw_readout(self, enable="rows", bytes_per_pixel=None, bytes_per_image=None): """ Enable raw frame transfer. Should be used if the camera uses unsupported pixel format. Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array), ``"rows"`` (return a 2D array, where each row corresponds to a single image row), or ``False`` (convert to image data, or raise an error if the format is not supported; default). In addition, for cameras which incorrectly implement ``"PayloadSize"`` parameter, one can explicitly specify the number of bytes per pixel (possibly fractional) which will be used to calculate the total byte size of the frame, or the total number of bytes per image (if specified, takes priority over `bytes_per_pixel`). Both `bytes_per_pixel` and `bytes_per_image` only apply if `enable` is set to ``"frame"`` or ``"rows"``. """ funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"}) self._raw_readout_format=enable self._raw_readout_format_bypp=bytes_per_pixel if enable else None self._raw_readout_format_bypi=bytes_per_image if enable else None
def _read_frames(self, rng, return_info=False): # TODO: add parsing and pixel format indices=range(*rng) shape=self.cav["Height"],self.cav["Width"] if self._raw_readout_format_bypi is not None: size_bytes=self._raw_readout_format_bypi elif self._raw_readout_format_bypp is not None: size_bytes=int(shape[0]*shape[1]*self._raw_readout_format_bypp) else: size_bytes=self.cav["PayloadSize"] pixel_format=self.cav["PixelFormat"] frames=[self._read_data_raw(b,size_bytes) for b in indices] frames=[self._parse_data(f,shape,pixel_format) for f in frames] if not self._raw_readout_format: frames=[self._convert_indexing(f,"rct") for f in frames] return frames,None
[docs] class EthernetIMAQdxCamera(IMAQdxCamera): """ LAN-controlled IMAQdx camera. Compared to the standard camera, has an option of automatically switching to a smaller TCP/IP packet size (can be useful if the PC network adapter can't handle jumbo packets). Args: name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``) mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading) visibility: default attribute visibility when listing attributes; can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes). small_packet: if ``True``, automatically set small packet size (1500 bytes). """ def __init__(self, name="cam0", mode="controller", visibility="advanced", small_packet=False): self.small_packet=small_packet super().__init__(name=name,mode=mode,visibility=visibility)
[docs] def post_open(self): super().post_open() if self.small_packet: self.set_attribute_value("AcquisitionAttributes/PacketSize",1500,error_on_missing=False)