Source code for pylablib.devices.IMAQdx.IMAQdx

from pylablib.core.utils import funcargparse
from . import NIIMAQdx_lib
from .NIIMAQdx_lib import IMAQdxAttributeType, IMAQdxAttributeVisibility, IMAQdxCameraControlMode, IMAQdxBufferNumberMode
from .NIIMAQdx_lib import wlib as lib, IMAQdxError, IMAQdxLibError  # pylint: disable=unused-import

from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera

import numpy as np
import collections
import time


class IMAQdxTimeoutError(IMAQdxError):
    "IMAQdx frame timeout error"







TCameraInfo=collections.namedtuple("TCameraInfo",["name","type","version","flags","serial_number","bus","vendor","model","camera_file","attr_url"])
def _parse_camera_info(info):
    name=py3.as_str(info.InterfaceName)
    ctype=info.Type
    version=info.Version
    flags=info.Flags
    serial_number="{:016X}".format((info.SerialNumberHi<<32)+info.SerialNumberLo)
    try:
        bus=NIIMAQdx_lib.IMAQdxBusType(info.BusType).name[len("IMAQdxBusType"):]
    except ValueError:
        bus=info.BusType
    vendor=py3.as_str(info.VendorName)
    model=py3.as_str(info.ModelName)
    camera_file=py3.as_str(info.CameraFileName)
    attr_url=py3.as_str(info.CameraAttributeURL)
    return TCameraInfo(name,ctype,version,flags,serial_number,bus,vendor,model,camera_file,attr_url)
[docs]def list_cameras(connected=True, desc=True): """ List all cameras available through IMAQdx interface If ``desc==True``, return complete camera descriptions; otherwise, simply return the names. """ lib.initlib() cams=lib.IMAQdxEnumerateCameras(connected) infos=[_parse_camera_info(c) for c in cams] return infos if desc else [inf.name for inf in infos]
[docs]def get_cameras_number(): """Get number of connected dx cameras""" return len(list_cameras())
[docs]class IMAQdxAttribute: """ Object representing an IMAQdx camera parameter. Allows to query and set values and get additional information. Usually created automatically by an :class:`IMAQdxCamera` instance, but could be created manually. Args: sid: camera session ID name: attribute text name Attributes: name: attribute name kind: attribute kind; can be ``"u32"``, ``"i64"``, ``"f64"``, ``"str"``, ``"enum"``, ``"bool"``, ``"command"``, or ``"blob"`` display_name: attribute display name (short description name) tooltip: longer attribute description description: full attribute description (usually, same as `tooltip`) units: attribute units (if applicable) visibility: attribute visibility (``"simple"``, ``"intermediate"``, or ``"advanced"``) readable (bool): whether attribute is readable writable (bool): whether attribute is writable min (float or int): minimal attribute value (if applicable) max (float or int): maximal attribute value (if applicable) inc (float or int): minimal attribute increment value (if applicable) ivalues: list of possible integer values for enum attributes values: list of possible text values for enum attributes labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute """ _attr_types={ IMAQdxAttributeType.IMAQdxAttributeTypeU32:"u32", IMAQdxAttributeType.IMAQdxAttributeTypeI64:"i64", IMAQdxAttributeType.IMAQdxAttributeTypeF64:"f64", IMAQdxAttributeType.IMAQdxAttributeTypeString:"str", IMAQdxAttributeType.IMAQdxAttributeTypeEnum:"enum", IMAQdxAttributeType.IMAQdxAttributeTypeBool:"bool", IMAQdxAttributeType.IMAQdxAttributeTypeCommand:"command", IMAQdxAttributeType.IMAQdxAttributeTypeBlob:"blob"} _vis_types={ IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple:"simple", IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate:"intermediate", IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced:"advanced"} def __init__(self, sid, name): self.sid=sid self.name=py3.as_str(name) self.display_name=py3.as_str(lib.IMAQdxGetAttributeDisplayName(sid,name)) self.tooltip=py3.as_str(lib.IMAQdxGetAttributeTooltip(sid,name)) self.description=py3.as_str(lib.IMAQdxGetAttributeDescription(sid,name)) self.visibility=self._vis_types[lib.IMAQdxGetAttributeVisibility(sid,name)] self.units=py3.as_str(lib.IMAQdxGetAttributeUnits(sid,name)) self.readable=lib.IMAQdxIsAttributeReadable(sid,name) self.writable=lib.IMAQdxIsAttributeWritable(sid,name) self._attr_type_n=lib.IMAQdxGetAttributeType(sid,name) self.kind=self._attr_types[self._attr_type_n] self.min=None self.max=None self.inc=None self.values=[] self.ivalues=[] self.labels={} self.ilabels={} self.update_limits()
[docs] def update_limits(self): """Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``""" if self._attr_type_n in lib.numeric_attr_types: self.min=lib.IMAQdxGetAttributeMinimum(self.sid,self.name,self._attr_type_n) self.max=lib.IMAQdxGetAttributeMaximum(self.sid,self.name,self._attr_type_n) self.inc=lib.IMAQdxGetAttributeIncrement(self.sid,self.name,self._attr_type_n) return (self.min,self.max,self.inc) if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum: attr_values=lib.IMAQdxEnumerateAttributeValues(self.sid,self.name) self.values=[av.Name for av in attr_values] self.ivalues=[av.Value for av in attr_values] self.labels=dict(zip(self.values,self.ivalues)) self.ilabels=dict(zip(self.ivalues,self.values))
[docs] def truncate_value(self, value): """Truncate value to lie within attribute limits""" self.update_limits() if self._attr_type_n in lib.numeric_attr_types: if value<self.min: value=self.min elif value>self.max: value=self.max else: inc=self.inc if inc>0: value=((value-self.min)//inc)*inc+self.min return value
[docs] def get_value(self, enum_as_str=True): """ Get attribute value. If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values. """ if not self.readable: raise IMAQdxError("Attribute {} is not readable".format(self.name)) val=lib.IMAQdxGetAttribute(self.sid,self.name,self._attr_type_n) if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum: val=val.Name if enum_as_str else val.Value return val
[docs] def set_value(self, value, truncate=True): """ Get attribute value. If ``truncate==True``, automatically truncate value to lie within allowed range. """ if not self.writable: raise IMAQdxError("Attribute {} is not writable".format(self.name)) if truncate: value=self.truncate_value(value) return lib.IMAQdxSetAttribute(self.sid,self.name,value,None)
def __repr__(self): return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["vendor","model","serial_number","bus_type"])
[docs]class IMAQdxCamera(camera.IROICamera, camera.IAttributeCamera): """ Generic IMAQdx camera interface. Args: name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``) mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading) visibility: attribute visibility when listing attributes; can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes). """ Error=IMAQdxError TimeoutError=IMAQdxTimeoutError def __init__(self, name="cam0", mode="controller", visibility="advanced"): super().__init__() lib.initlib() self.name=name self.mode=mode self.visibility=visibility self.sid=None self.open() self._raw_readout_format=False self._add_info_variable("device_info",self.get_device_info) _p_connection_mode=interface.EnumParameterClass("connection_mode",{ "controller":IMAQdxCameraControlMode.IMAQdxCameraControlModeController, "listener":IMAQdxCameraControlMode.IMAQdxCameraControlModeListener}) _p_visibility=interface.EnumParameterClass("visibility",{ "simple":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple, "intermediate":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate, "advanced":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced}) def _get_connection_parameters(self): return self.name
[docs] def open(self): """Open connection to the camera""" if self.sid is not None: return mode=self._p_connection_mode(self.mode) self.sid=lib.IMAQdxOpenCamera(self.name,mode) with self._close_on_error(): self._update_attributes() self.post_open()
[docs] def close(self): """Close connection to the camera""" if self.sid is not None: try: self.clear_acquisition() finally: lib.IMAQdxCloseCamera(self.sid) self.sid=None
[docs] def reset(self): """Reset connection to the camera""" self.close() lib.IMAQdxResetCamera(self.name,False) self.open()
[docs] def is_opened(self): """Check if the device is connected""" return self.sid is not None
[docs] def post_open(self): """Additional setup after camera opening""" att=self.get_attribute("PixelFormat",error_on_missing=False) if att and att.writable: att.set_value(att.get_value()) # there seems to be occasional de-synchronization between the read and the actual value
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize","StatusInformation::LastBufferNumber","AcquisitionAttributes::BitsPerPixel"] def _normalize_attribute_name(self, name): return name.replace("::","/") def _list_attributes(self): visibility=self._p_visibility(self.visibility) attrs=lib.IMAQdxEnumerateAttributes2(self.sid,"",visibility) attr_names=[a.Name for a in attrs] for a in self._builtin_attrs: if a not in attr_names: try: lib.IMAQdxGetAttributeDisplayName(self.sid,a) attr_names.append(a) except IMAQdxError: pass return [IMAQdxAttribute(self.sid,a) for a in attr_names]
[docs] def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ """ Get value of an attribute with the given name. If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`. If `default` is not ``None``, assume that ``error_on_missing==False``. If `name` points at a dictionary branch, return a dictionary with all values in this branch. If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values. """ return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs] def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ """ Set value of an attribute with the given name. If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing. If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary). If ``truncate==True``, truncate value to lie within attribute range. """ return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs] def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ """Get values of all attributes with the given `root`""" values=dictionary.Dictionary() for n,att in self.get_attribute(root).as_dict("flat").items(): if att.readable: try: values[n]=att.get_value(enum_as_str=enum_as_str) except IMAQdxLibError: # sometimes nominally implemented features still raise errors if not ignore_errors: raise return values
[docs] def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ """ Set values of all attributes with the given `root`. If ``truncate==True``, truncate value to lie within attribute range. """ return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs] def get_device_info(self): """ Get camera information. Return tuple ``(vendor, model, serial_number, bus_type)``. """ cam_info=self.cav["CameraInformation"] serial="{:016X}".format((cam_info["SerialNumberHigh"]<<32)+cam_info["SerialNumberLow"]) return TDeviceInfo(cam_info["VendorName"],cam_info["ModelName"],serial,cam_info["BusType"])
def _get_data_dimensions_rc(self): return self.cav["Height"],self.cav["Width"]
[docs] def get_detector_size(self): """Get camera detector size (in pixels) as a tuple ``(width, height)``""" return self.ca["Width"].max,self.ca["Height"].max
[docs] def get_roi(self): ox=self.get_attribute_value("OffsetX",default=0) oy=self.get_attribute_value("OffsetY",default=0) w=self.cav["Width"] h=self.cav["Height"] return ox,ox+w,oy,oy+h
[docs] @camera.acqcleared def set_roi(self, hstart=0, hend=None, vstart=0, vend=None): for a in ["Width","Height","OffsetX","OffsetY"]: if a not in self.ca or not self.ca[a].writable: return self.get_roi() det_size=self.get_detector_size() if hend is None: hend=det_size[0] if vend is None: vend=det_size[1] with self.pausing_acquisition(): self.cav["Width"]=self.ca["Width"].min self.cav["Height"]=self.ca["Height"].min self.cav["OffsetX"]=hstart self.cav["OffsetY"]=vstart self.cav["Width"]=max(self.cav["Width"],hend-hstart) self.cav["Height"]=max(self.cav["Height"],vend-vstart) return self.get_roi()
[docs] def get_roi_limits(self, hbin=1, vbin=1): params=["Width","Height","OffsetX","OffsetY"] minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params]) maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params]) incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params]) hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0],incp[2] or maxp[0],incp[0] or maxp[0],1) vlim=camera.TAxisROILimit(minp[1] or maxp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1) return hlim,vlim
[docs] @interface.use_parameters(mode="acq_mode") def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ """ Setup acquisition mode. `mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition). (note that :meth:`.IMAQdxCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired). `nframes` sets up number of frame buffers. """ self.clear_acquisition() super().setup_acquisition(mode=mode,nframes=nframes) lib.IMAQdxConfigureAcquisition(self.sid,mode=="sequence",nframes)
[docs] def clear_acquisition(self): self.stop_acquisition() lib.IMAQdxUnconfigureAcquisition(self.sid) super().clear_acquisition()
[docs] def start_acquisition(self, *args, **kwargs): self.stop_acquisition() super().start_acquisition(*args,**kwargs) self._frame_counter.reset(self._acq_params["nframes"]) lib.IMAQdxStartAcquisition(self.sid)
[docs] def stop_acquisition(self): if self.acquisition_in_progress(): self._frame_counter.update_acquired_frames(self._get_acquired_frames()) lib.IMAQdxStopAcquisition(self.sid) super().stop_acquisition()
[docs] def acquisition_in_progress(self): """Check if acquisition is in progress""" return self.cav["StatusInformation/AcqInProgress"]
[docs] def refresh_acquisition(self, delay=0.005): """Stop and restart the acquisition, waiting `delay` seconds in between""" self.stop_acquisition() self.clear_acquisition() self.setup_acquisition(0,1) self.start_acquisition() time.sleep(delay) self.stop_acquisition() self.clear_acquisition()
def _get_acquired_frames(self): last_buffer=self.cav["StatusInformation/LastBufferNumber"] if last_buffer>=2**31: last_buffer=-1 # newest_buffer=-1 # try: # newest_buffer=lib.IMAQdxGetImageData(self.sid,None,0,IMAQdxBufferNumberMode.IMAQdxBufferNumberModeLast,0) # except IMAQdxLibError as e: # if e.code!=NIIMAQdx_lib.IMAQdxErrorCode.IMAQdxErrorCameraNotRunning: # raise return last_buffer+1 def _read_data_raw(self, buffer_num, size_bytes, dtype="<u1", mode=IMAQdxBufferNumberMode.IMAQdxBufferNumberModeBufferNumber): """Return raw bytes string from the given buffer number""" dtype=np.dtype(dtype) if size_bytes%dtype.itemsize: raise IMAQdxError("specified buffer size {} is not divisible by the element size {}".format(size_bytes,dtype.itemsize)) arr=np.empty(size_bytes//dtype.itemsize,dtype) lib.IMAQdxGetImageData(self.sid,arr.ctypes.data,size_bytes,mode,buffer_num) return arr def _parse_data(self, data, shape, pixel_format): if self._raw_readout_format=="frame": return data if self._raw_readout_format=="rows": return data.reshape((shape[0],-1)) supported_formats=["Mono8","Mono10","Mono12","Mono16","Mono32"] if pixel_format not in supported_formats: sf_string=", ".join(supported_formats) raise IMAQdxError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string)) if pixel_format=="Mono8": return data.reshape(shape) elif pixel_format in ["Mono10","Mono12","Mono16"]: return data.view("<u2").reshape(shape) else: return data.view("<u4").reshape(shape)
[docs] def enable_raw_readout(self, enable="rows"): """ Enable raw frame transfer. Should be used if the camera uses unsupported pixel format. Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array), ``"rows"`` (return a 2D array, where each row corresponds to a single image row), or ``False`` (convert to image data, or raise an error if the format is not supported; default) """ funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"}) self._raw_readout_format=enable
def _read_frames(self, rng, return_info=False): # TODO: add parsing and pixel format indices=range(*rng) size_bytes=self.cav["PayloadSize"] shape=self.cav["Height"],self.cav["Width"] pixel_format=self.cav["PixelFormat"] frames=[self._read_data_raw(b,size_bytes) for b in indices] frames=[self._parse_data(f,shape,pixel_format) for f in frames] if not self._raw_readout_format: frames=[self._convert_indexing(f,"rct") for f in frames] return frames,None
[docs]class EthernetIMAQdxCamera(IMAQdxCamera): """ LAN-controlled IMAQdx camera. Compared to the standard camera, has an option of automatically switching to a smaller TCP/IP packet size (can be useful if the PC network adapter can't handle jumbo packets). Args: name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``) mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading) visibility: default attribute visibility when listing attributes; can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes). small_packet: if ``True``, automatically set small packet size (1500 bytes). """ def __init__(self, name="cam0", mode="controller", visibility="advanced", small_packet=False): self.small_packet=small_packet super().__init__(name=name,mode=mode,visibility=visibility)
[docs] def post_open(self): super().post_open() if self.small_packet: self.set_attribute_value("AcquisitionAttributes/PacketSize",1500,error_on_missing=False)