from pylablib.core.utils import funcargparse
from . import NIIMAQdx_lib
from .NIIMAQdx_lib import IMAQdxAttributeType, IMAQdxAttributeVisibility, IMAQdxCameraControlMode, IMAQdxBufferNumberMode
from .NIIMAQdx_lib import wlib as lib, IMAQdxError, IMAQdxLibError # pylint: disable=unused-import
from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera
import numpy as np
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
from .utils import get_callback # pylint: disable=no-name-in-module
import ctypes
import collections
import time
class IMAQdxTimeoutError(IMAQdxError):
"IMAQdx frame timeout error"
TCameraInfo=collections.namedtuple("TCameraInfo",["name","type","version","flags","serial_number","bus","vendor","model","camera_file","attr_url"])
def _parse_camera_info(info):
name=py3.as_str(info.InterfaceName)
ctype=info.Type
version=info.Version
flags=info.Flags
serial_number="{:016X}".format((info.SerialNumberHi<<32)+info.SerialNumberLo)
try:
bus=NIIMAQdx_lib.IMAQdxBusType(info.BusType).name[len("IMAQdxBusType"):]
except ValueError:
bus=info.BusType
vendor=py3.as_str(info.VendorName)
model=py3.as_str(info.ModelName)
camera_file=py3.as_str(info.CameraFileName)
attr_url=py3.as_str(info.CameraAttributeURL)
return TCameraInfo(name,ctype,version,flags,serial_number,bus,vendor,model,camera_file,attr_url)
[docs]
def list_cameras(connected=True, desc=True):
"""
List all cameras available through IMAQdx interface
If ``desc==True``, return complete camera descriptions; otherwise, simply return the names.
"""
lib.initlib()
cams=lib.IMAQdxEnumerateCameras(connected)
infos=[_parse_camera_info(c) for c in cams]
return infos if desc else [inf.name for inf in infos]
[docs]
def get_cameras_number():
"""Get number of connected dx cameras"""
return len(list_cameras())
[docs]
class IMAQdxAttribute:
"""
Object representing an IMAQdx camera parameter.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`IMAQdxCamera` instance, but could be created manually.
Args:
sid: camera session ID
name: attribute text name
Attributes:
name: attribute name
kind: attribute kind; can be ``"u32"``, ``"i64"``, ``"f64"``, ``"str"``, ``"enum"``,
``"bool"``, ``"command"``, or ``"blob"``
display_name: attribute display name (short description name)
tooltip: longer attribute description
description: full attribute description (usually, same as `tooltip`)
units: attribute units (if applicable)
visibility: attribute visibility (``"simple"``, ``"intermediate"``, or ``"advanced"``)
readable (bool): whether attribute is readable
writable (bool): whether attribute is writable
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
ivalues: list of possible integer values for enum attributes
values: list of possible text values for enum attributes
labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values
ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute
"""
_attr_types={ IMAQdxAttributeType.IMAQdxAttributeTypeU32:"u32",
IMAQdxAttributeType.IMAQdxAttributeTypeI64:"i64",
IMAQdxAttributeType.IMAQdxAttributeTypeF64:"f64",
IMAQdxAttributeType.IMAQdxAttributeTypeString:"str",
IMAQdxAttributeType.IMAQdxAttributeTypeEnum:"enum",
IMAQdxAttributeType.IMAQdxAttributeTypeBool:"bool",
IMAQdxAttributeType.IMAQdxAttributeTypeCommand:"command",
IMAQdxAttributeType.IMAQdxAttributeTypeBlob:"blob"}
_vis_types={ IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple:"simple",
IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate:"intermediate",
IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced:"advanced"}
def __init__(self, sid, name):
self.sid=sid
self.name=py3.as_str(name)
self.display_name=py3.as_str(lib.IMAQdxGetAttributeDisplayName(sid,name))
self.tooltip=py3.as_str(lib.IMAQdxGetAttributeTooltip(sid,name))
self.description=py3.as_str(lib.IMAQdxGetAttributeDescription(sid,name))
self.visibility=self._vis_types[lib.IMAQdxGetAttributeVisibility(sid,name)]
self.units=py3.as_str(lib.IMAQdxGetAttributeUnits(sid,name))
self.readable=lib.IMAQdxIsAttributeReadable(sid,name)
self.writable=lib.IMAQdxIsAttributeWritable(sid,name)
self._attr_type_n=lib.IMAQdxGetAttributeType(sid,name)
self.kind=self._attr_types[self._attr_type_n]
self.min=None
self.max=None
self.inc=None
self.values=[]
self.ivalues=[]
self.labels={}
self.ilabels={}
self.update_limits()
[docs]
def update_limits(self):
"""Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``"""
if self._attr_type_n in lib.numeric_attr_types:
self.min=lib.IMAQdxGetAttributeMinimum(self.sid,self.name,self._attr_type_n)
self.max=lib.IMAQdxGetAttributeMaximum(self.sid,self.name,self._attr_type_n)
self.inc=lib.IMAQdxGetAttributeIncrement(self.sid,self.name,self._attr_type_n)
return (self.min,self.max,self.inc)
if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum:
attr_values=lib.IMAQdxEnumerateAttributeValues(self.sid,self.name)
self.values=[av.Name for av in attr_values]
self.ivalues=[av.Value for av in attr_values]
self.labels=dict(zip(self.values,self.ivalues))
self.ilabels=dict(zip(self.ivalues,self.values))
[docs]
def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_limits()
if self._attr_type_n in lib.numeric_attr_types:
if value<self.min:
value=self.min
elif value>self.max:
value=self.max
else:
inc=self.inc
if inc>0:
value=((value-self.min)//inc)*inc+self.min
return value
[docs]
def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if not self.readable:
raise IMAQdxError("Attribute {} is not readable".format(self.name))
val=lib.IMAQdxGetAttribute(self.sid,self.name,self._attr_type_n)
if self._attr_type_n==IMAQdxAttributeType.IMAQdxAttributeTypeEnum:
val=val.Name if enum_as_str else val.Value
return val
[docs]
def set_value(self, value, truncate=True):
"""
Get attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
if not self.writable:
raise IMAQdxError("Attribute {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
return lib.IMAQdxSetAttribute(self.sid,self.name,value,None)
def __repr__(self):
return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["vendor","model","serial_number","bus_type"])
[docs]
class IMAQdxCamera(camera.IROICamera, camera.IAttributeCamera):
"""
Generic IMAQdx camera interface.
Args:
name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``)
mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading)
visibility: attribute visibility when listing attributes;
can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes).
"""
Error=IMAQdxError
TimeoutError=IMAQdxTimeoutError
def __init__(self, name="cam0", mode="controller", visibility="advanced"):
super().__init__()
lib.initlib()
self.name=name
self.mode=mode
self.visibility=visibility
self.sid=None
self._cb_manager=self.CallbackManager()
self.open()
self._raw_readout_format=False
self._raw_readout_format_bypp=None
self._raw_readout_format_bypi=None
self._add_info_variable("device_info",self.get_device_info)
_p_connection_mode=interface.EnumParameterClass("connection_mode",{
"controller":IMAQdxCameraControlMode.IMAQdxCameraControlModeController,
"listener":IMAQdxCameraControlMode.IMAQdxCameraControlModeListener})
_p_visibility=interface.EnumParameterClass("visibility",{
"simple":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilitySimple,
"intermediate":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityIntermediate,
"advanced":IMAQdxAttributeVisibility.IMAQdxAttributeVisibilityAdvanced})
def _get_connection_parameters(self):
return self.name
[docs]
def open(self):
"""Open connection to the camera"""
if self.sid is not None:
return
mode=self._p_connection_mode(self.mode)
self.sid=lib.IMAQdxOpenCamera(self.name,mode)
with self._close_on_error():
self._update_attributes()
self.post_open()
[docs]
def close(self):
"""Close connection to the camera"""
if self.sid is not None:
try:
self.clear_acquisition()
finally:
lib.IMAQdxCloseCamera(self.sid)
self.sid=None
[docs]
def reset(self):
"""Reset connection to the camera"""
self.close()
lib.IMAQdxResetCamera(self.name,False)
self.open()
[docs]
def is_opened(self):
"""Check if the device is connected"""
return self.sid is not None
[docs]
def post_open(self):
"""Additional setup after camera opening"""
att=self.get_attribute("PixelFormat",error_on_missing=False)
if att and att.writable:
att.set_value(att.get_value()) # there seems to be occasional de-synchronization between the read and the actual value
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize","StatusInformation::LastBufferNumber","AcquisitionAttributes::BitsPerPixel"]
def _normalize_attribute_name(self, name):
return name.replace("::","/")
def _list_attributes(self):
visibility=self._p_visibility(self.visibility)
attrs=lib.IMAQdxEnumerateAttributes2(self.sid,"",visibility)
attr_names=[a.Name for a in attrs]
for a in self._builtin_attrs:
if a not in attr_names:
try:
lib.IMAQdxGetAttributeDisplayName(self.sid,a)
attr_names.append(a)
except IMAQdxError:
pass
return [IMAQdxAttribute(self.sid,a) for a in attr_names]
[docs]
def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ
"""
Get value of an attribute with the given name.
If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`.
If `default` is not ``None``, assume that ``error_on_missing==False``.
If `name` points at a dictionary branch, return a dictionary with all values in this branch.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs]
def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ
"""
Set value of an attribute with the given name.
If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing.
If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary).
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs]
def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ
"""Get values of all attributes with the given `root`"""
values=dictionary.Dictionary()
for n,att in self.get_attribute(root).as_dict("flat").items():
if att.readable:
try:
values[n]=att.get_value(enum_as_str=enum_as_str)
except IMAQdxLibError: # sometimes nominally implemented features still raise errors
if not ignore_errors:
raise
return values
[docs]
def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ
"""
Set values of all attributes with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs]
def get_device_info(self):
"""
Get camera information.
Return tuple ``(vendor, model, serial_number, bus_type)``.
"""
cam_info=self.cav["CameraInformation"]
serial="{:016X}".format((cam_info["SerialNumberHigh"]<<32)+cam_info["SerialNumberLow"])
return TDeviceInfo(cam_info["VendorName"],cam_info["ModelName"],serial,cam_info["BusType"])
def _get_data_dimensions_rc(self):
return self.cav["Height"],self.cav["Width"]
[docs]
def get_detector_size(self):
"""Get camera detector size (in pixels) as a tuple ``(width, height)``"""
return self.ca["Width"].max,self.ca["Height"].max
[docs]
def get_roi(self):
ox=self.get_attribute_value("OffsetX",default=0)
oy=self.get_attribute_value("OffsetY",default=0)
w=self.cav["Width"]
h=self.cav["Height"]
return ox,ox+w,oy,oy+h
[docs]
@camera.acqcleared
def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
for a in ["Width","Height","OffsetX","OffsetY"]:
if a not in self.ca or not self.ca[a].writable:
return self.get_roi()
det_size=self.get_detector_size()
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
with self.pausing_acquisition():
self.cav["Width"]=self.ca["Width"].min
self.cav["Height"]=self.ca["Height"].min
self.cav["OffsetX"]=hstart
self.cav["OffsetY"]=vstart
self.cav["Width"]=max(self.cav["Width"],hend-hstart)
self.cav["Height"]=max(self.cav["Height"],vend-vstart)
return self.get_roi()
[docs]
def get_roi_limits(self, hbin=1, vbin=1):
params=["Width","Height","OffsetX","OffsetY"]
minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params])
maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params])
incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params])
hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0],incp[2] or maxp[0],incp[0] or maxp[0],1)
vlim=camera.TAxisROILimit(minp[1] or maxp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1)
return hlim,vlim
[docs]
class CallbackManager:
def __init__(self):
self.commdata=(ctypes.c_uint32*2)()
self.callback=get_callback()
[docs]
def get_callback_ptr(self):
return self.callback.address
[docs]
def register(self, sid):
self.reset()
self.start()
lib.IMAQdxRegisterFrameDoneEvent(sid,1,self.callback,ctypes.addressof(self.commdata),wrap=False)
[docs]
def reset(self):
self.commdata[1]=0
[docs]
def start(self):
self.commdata[0]=1
[docs]
def stop(self):
self.commdata[0]=0
[docs]
def get_nbuff(self):
return int(self.commdata[1])
[docs]
@interface.use_parameters(mode="acq_mode")
def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ
"""
Setup acquisition mode.
`mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition).
(note that :meth:`.IMAQdxCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired).
`nframes` sets up number of frame buffers.
"""
self.clear_acquisition()
super().setup_acquisition(mode=mode,nframes=nframes)
[docs]
def clear_acquisition(self):
self.stop_acquisition()
super().clear_acquisition()
[docs]
def start_acquisition(self, *args, **kwargs):
self.stop_acquisition()
super().start_acquisition(*args,**kwargs)
self._frame_counter.reset(self._acq_params["nframes"])
lib.IMAQdxConfigureAcquisition(self.sid,self._acq_params["mode"]=="sequence",self._acq_params["nframes"])
self._cb_manager.register(self.sid)
lib.IMAQdxStartAcquisition(self.sid)
[docs]
def stop_acquisition(self):
if self.acquisition_in_progress():
self._frame_counter.update_acquired_frames(self._get_acquired_frames())
lib.IMAQdxStopAcquisition(self.sid)
self._cb_manager.stop()
lib.IMAQdxUnconfigureAcquisition(self.sid)
super().stop_acquisition()
[docs]
def acquisition_in_progress(self):
"""Check if acquisition is in progress"""
return self.cav["StatusInformation/AcqInProgress"]
[docs]
def refresh_acquisition(self, delay=0.005):
"""Stop and restart the acquisition, waiting `delay` seconds in between"""
self.stop_acquisition()
self.clear_acquisition()
self.setup_acquisition(0,1)
self.start_acquisition()
time.sleep(delay)
self.stop_acquisition()
self.clear_acquisition()
def _get_acquired_frames(self):
# last_buffer=self.cav["StatusInformation/LastBufferNumber"]
# if last_buffer>=2**31:
# last_buffer=-1
# return last_buffer+1
return self._cb_manager.get_nbuff()
def _read_data_raw(self, buffer_num, size_bytes, dtype="<u1", mode=IMAQdxBufferNumberMode.IMAQdxBufferNumberModeBufferNumber):
"""Return raw bytes string from the given buffer number"""
dtype=np.dtype(dtype)
if size_bytes%dtype.itemsize:
raise IMAQdxError("specified buffer size {} is not divisible by the element size {}".format(size_bytes,dtype.itemsize))
arr=np.empty(size_bytes//dtype.itemsize,dtype)
lib.IMAQdxGetImageData(self.sid,arr.ctypes.data,size_bytes,mode,buffer_num)
return arr
def _parse_data(self, data, shape, pixel_format):
if self._raw_readout_format=="frame":
return data
if self._raw_readout_format=="rows":
return data.reshape((shape[0],-1))
supported_formats=["Mono8","Mono10","Mono12","Mono16","Mono32"]
if pixel_format not in supported_formats:
sf_string=", ".join(supported_formats)
raise IMAQdxError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string))
if pixel_format=="Mono8":
return data.reshape(shape)
elif pixel_format in ["Mono10","Mono12","Mono16"]:
return data.view("<u2").reshape(shape)
else:
return data.view("<u4").reshape(shape)
[docs]
def enable_raw_readout(self, enable="rows", bytes_per_pixel=None, bytes_per_image=None):
"""
Enable raw frame transfer.
Should be used if the camera uses unsupported pixel format.
Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array),
``"rows"`` (return a 2D array, where each row corresponds to a single image row),
or ``False`` (convert to image data, or raise an error if the format is not supported; default).
In addition, for cameras which incorrectly implement ``"PayloadSize"`` parameter, one can explicitly specify the number
of bytes per pixel (possibly fractional) which will be used to calculate the total byte size of the frame,
or the total number of bytes per image (if specified, takes priority over `bytes_per_pixel`).
Both `bytes_per_pixel` and `bytes_per_image` only apply if `enable` is set to ``"frame"`` or ``"rows"``.
"""
funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"})
self._raw_readout_format=enable
self._raw_readout_format_bypp=bytes_per_pixel if enable else None
self._raw_readout_format_bypi=bytes_per_image if enable else None
def _read_frames(self, rng, return_info=False): # TODO: add parsing and pixel format
indices=range(*rng)
shape=self.cav["Height"],self.cav["Width"]
if self._raw_readout_format_bypi is not None:
size_bytes=self._raw_readout_format_bypi
elif self._raw_readout_format_bypp is not None:
size_bytes=int(shape[0]*shape[1]*self._raw_readout_format_bypp)
else:
size_bytes=self.cav["PayloadSize"]
pixel_format=self.cav["PixelFormat"]
frames=[self._read_data_raw(b,size_bytes) for b in indices]
frames=[self._parse_data(f,shape,pixel_format) for f in frames]
if not self._raw_readout_format:
frames=[self._convert_indexing(f,"rct") for f in frames]
return frames,None
[docs]
class EthernetIMAQdxCamera(IMAQdxCamera):
"""
LAN-controlled IMAQdx camera.
Compared to the standard camera, has an option of automatically switching to a smaller TCP/IP packet size
(can be useful if the PC network adapter can't handle jumbo packets).
Args:
name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``)
mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading)
visibility: default attribute visibility when listing attributes;
can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes).
small_packet: if ``True``, automatically set small packet size (1500 bytes).
"""
def __init__(self, name="cam0", mode="controller", visibility="advanced", small_packet=False):
self.small_packet=small_packet
super().__init__(name=name,mode=mode,visibility=visibility)
[docs]
def post_open(self):
super().post_open()
if self.small_packet:
self.set_attribute_value("AcquisitionAttributes/PacketSize",1500,error_on_missing=False)