from pylablib.core.utils import funcargparse
from .VmbC_defs import VmbFeatureDataType, VmbFeatureVisibilityType, VmbAccessModeType
from .VmbC_lib import wlib as lib, AlliedVisionVimbaXError, AlliedVisionVimbaXLibError # pylint: disable=unused-import
from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera
from ..utils import load_lib, pixel_format as pf
from ...core.utils.ctypes_tools import funcaddressof, as_ctypes_array
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
from .utils import get_callback # pylint: disable=no-name-in-module
import numpy as np
import collections
import ctypes
import functools
import time
import warnings
class AlliedVisionVimbaXTimeoutError(AlliedVisionVimbaXError):
"""Allied Vision Vimba X frame timeout error"""
[docs]
class LibraryController(load_lib.LibraryController):
def _do_init(self):
lib.VmbStartup(None)
def _do_uninit(self):
lib.VmbShutdown()
libctl=LibraryController(lib)
[docs]
def restart_lib():
libctl.shutdown()
[docs]
def get_SDK_version():
"""Get version of Allied Vision VimbaX SDK"""
with libctl.temp_open():
return ".".join([str(v) for v in lib.VmbVersionQuery()])
_camera_info_alias={"cameraIdString":"name", "modelName":"model", "cameraName":"model_full", "serialString":"serial", "cameraIdExtended":"name_full"}
TCameraInfo=collections.namedtuple("TCameraInfo",["name","model","model_full","serial","name_full"])
def _parse_cam_info(cam_info):
"""Get Allied Vision VimbaX camera info for a camera with the given index"""
values={alias:py3.as_str(getattr(cam_info,name)) for name,alias in _camera_info_alias.items()}
return TCameraInfo(**values)
[docs]
def list_cameras(desc=True):
"""
List all cameras available through Allied Vision VimbaX interface
If ``desc==True``, return complete camera descriptions; otherwise, simply return the names.
"""
with libctl.temp_open():
camlst=lib.VmbCamerasList()
cams=[_parse_cam_info(ci) for ci in camlst]
return cams if desc else [c.name for c in cams]
def _find_camera(cameras, **kwargs):
for i,c in enumerate(cameras):
match=True
for k,v in kwargs.items():
if getattr(c,k)!=v:
match=False
break
if match:
return i
return None
[docs]
def get_cameras_number():
"""Get number of connected Allied Vision VimbaX cameras"""
with libctl.temp_open():
return len(lib.VmbCamerasList())
def _retry_on_fail(name, nrep=3, delay=0.1):
def wrapper(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
for i in range(nrep):
try:
return f(*args,**kwargs)
except AlliedVisionVimbaXError as err:
if i==nrep-1:
raise
warnings.warn("functon {} with arguments *{}, **{} raised an error {}".format(name,args,kwargs,err))
time.sleep(delay)
return wrapped
return wrapper
[docs]
class AlliedVisionVimbaXAttribute:
"""
Object representing an Allied Vision VimbaX GenAPI attribute.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`AlliedVisionVimbaXCamera` instance.
Args:
cam: camera handle
name: attribute name
full_name: if supplied, attribute's full name, including the tree structure
Attributes:
name: attribute name
kind: attribute kind; can be ``"int"``, ``"float"``, ``"bool"``, ``"enum"``, ``"str"``,
``"command"``, ``"raw"``, ``"none"``, or ``"unknown"``
display_name: attribute display name (short description name)
tooltip: longer attribute description
description: full attribute description (usually, same as `tooltip`)
visibility: attribute visibility; can be ``"simple"``, ``"intermediate"``, ``"advanced"``, ``"invisible"``, or ``"unknown"``
category: attribute category (parent path within the parameter tree)
namespace: attribute namespace (e.g., ``"Standard"`` or ``"Custom"``)
representation: numerical attribute representation (e.g., ``"PureNumber"`` or ``"Logarithmic"``)
readable (bool): whether attribute is readable
writable (bool): whether attribute is writable
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
units: attribute units (if applicable)
ivalues: list of possible integer values for enum attributes
values: list of possible text values for enum attributes
labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values
ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute
"""
_attr_types={ VmbFeatureDataType.VmbFeatureDataInt:"int",
VmbFeatureDataType.VmbFeatureDataFloat:"float",
VmbFeatureDataType.VmbFeatureDataBool:"bool",
VmbFeatureDataType.VmbFeatureDataEnum:"enum",
VmbFeatureDataType.VmbFeatureDataString:"str",
VmbFeatureDataType.VmbFeatureDataCommand:"command",
VmbFeatureDataType.VmbFeatureDataRaw:"raw",
VmbFeatureDataType.VmbFeatureDataNone:"none",
VmbFeatureDataType.VmbFeatureDataUnknown:"unknown"}
_vis_types={ VmbFeatureVisibilityType.VmbFeatureVisibilityBeginner:"simple",
VmbFeatureVisibilityType.VmbFeatureVisibilityExpert:"intermediate",
VmbFeatureVisibilityType.VmbFeatureVisibilityGuru:"advanced",
VmbFeatureVisibilityType.VmbFeatureVisibilityInvisible:"invisible",
VmbFeatureVisibilityType.VmbFeatureVisibilityUnknown:"unknown"}
def __init__(self, cam, name, full_name=None):
self.cam=cam
self.name=py3.as_str(name)
info=lib.VmbFeatureInfoQuery(cam,name)
self.kind=self._attr_types.get(info.featureDataType,"unknown")
self.display_name=py3.as_str(info.displayName)
self.tooltip=py3.as_str(info.tooltip)
self.description=py3.as_str(info.description)
self.category=py3.as_str(info.category)
self.full_name=full_name or self.category.lstrip("/")+"/"+self.name
self.namespace=py3.as_str(info.sfncNamespace)
self.representation=py3.as_str(info.representation)
self.visibility=self._vis_types[info.visibility]
access=lib.VmbFeatureAccessQuery(cam,name)
self.readable=bool(access[0])
self.writable=bool(access[1])
self.units=py3.as_str(info.unit)
self.min=None
self.max=None
self.inc=None
self.repr=None
self._value_nodes=None
self.values=[]
self.ivalues=[]
self.labels={}
self.ilabels={}
self.enum_expanded={}
self._fill_info()
def _fill_info(self):
self.update_limits()
def _update_attributes(self):
access=lib.VmbFeatureAccessQuery(self.cam,self.name)
self.readable=bool(access[0])
self.writable=bool(access[1])
[docs]
def update_limits(self):
"""Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``"""
self._update_attributes()
if self.kind=="int":
try:
self.min,self.max=lib.VmbFeatureIntRangeQuery(self.cam,self.name)
except AlliedVisionVimbaXError:
self.min,self.max=None,None
try:
self.inc=lib.VmbFeatureIntIncrementQuery(self.cam,self.name)
except AlliedVisionVimbaXError:
self.inc=None
return (self.min,self.max,self.inc)
elif self.kind=="float":
try:
self.min,self.max=lib.VmbFeatureFloatRangeQuery(self.cam,self.name)
except AlliedVisionVimbaXError:
self.min,self.max=None,None
try:
ena,inc=lib.VmbFeatureFloatIncrementQuery(self.cam,self.name)
self.inc=inc if ena else None
except AlliedVisionVimbaXError:
self.inc=None
return (self.min,self.max,self.inc)
elif self.kind=="enum":
self.values=[py3.as_str(v) for v in lib.VmbFeatureEnumRangeQuery(self.cam,self.name)]
self.ivalues=[lib.VmbFeatureEnumAsInt(self.cam,self.name,n) for n in self.values]
self.labels=dict(zip(self.values,self.ivalues))
self.ilabels=dict(zip(self.ivalues,self.values))
enum_entries=[lib.VmbFeatureEnumEntryGet(self.cam,self.name,v) for v in self.values]
self.enum_expanded={v:(py3.as_str(ee.displayName),py3.as_str(ee.tooltip),py3.as_str(ee.description)) for v,ee in zip(self.values,enum_entries)}
[docs]
def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_limits()
if self.kind in ["int","float"]:
if self.min is not None and value<self.min:
value=self.min
elif self.max is not None and value>self.max:
value=self.max
else:
if self.min is not None and self.inc and self.inc>0:
value=((value-self.min)//self.inc)*self.inc+self.min
return value
[docs]
@_retry_on_fail("get_value")
def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if self.kind=="command":
return bool(lib.VmbFeatureCommandIsDone(self.cam,self.name))
self._update_attributes()
if not self.readable:
raise AlliedVisionVimbaXError("attribute {} is not readable".format(self.name))
if self.kind=="int":
return lib.VmbFeatureIntGet(self.cam,self.name)
if self.kind=="float":
return lib.VmbFeatureFloatGet(self.cam,self.name)
if self.kind=="bool":
return bool(lib.VmbFeatureBoolGet(self.cam,self.name))
if self.kind=="str":
return py3.as_str(lib.VmbFeatureStringGet(self.cam,self.name))
if self.kind=="enum":
value=py3.as_str(lib.VmbFeatureEnumGet(self.cam,self.name))
if not enum_as_str:
value=self.labels[value]
return value
if self.kind=="unknown":
return None
raise AlliedVisionVimbaXError("attribute {} of kind {} can not be read".format(self.name,self.kind))
[docs]
@_retry_on_fail("set_value")
def set_value(self, value, truncate=True):
"""
Set attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
self._update_attributes()
if not self.writable:
raise AlliedVisionVimbaXError("attribute {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
if self.kind=="int":
lib.VmbFeatureIntSet(self.cam,self.name,int(value))
elif self.kind=="float":
lib.VmbFeatureFloatSet(self.cam,self.name,float(value))
elif self.kind=="bool":
lib.VmbFeatureBoolSet(self.cam,self.name,bool(value))
elif self.kind=="str":
lib.VmbFeatureStringSet(self.cam,self.name,str(value))
elif self.kind=="enum":
value=self.ilabels.get(value,value)
lib.VmbFeatureEnumSet(self.cam,self.name,str(value))
elif self.kind!="unknown":
raise AlliedVisionVimbaXError("attribute {} of kind {} can not be set".format(self.name,self.kind))
[docs]
def call_command(self):
"""Execute the given command"""
if self.kind=="command":
lib.VmbFeatureCommandRun(self.cam,self.name)
else:
raise AlliedVisionVimbaXError("attribute {} is not a command".format(self.name))
def __repr__(self):
return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",TCameraInfo._fields)
TFrameInfo=collections.namedtuple("TFrameInfo",["frame_index","framestamp","timestamp","offset_x","offset_y"])
[docs]
class AlliedVisionVimbaXCamera(camera.IROICamera, camera.IAttributeCamera, camera.IExposureCamera):
"""
Generic Allied Vision VimbaX camera interface.
Args:
idx: camera index among the cameras listed using :func:`list_cameras`
name: camera name; if specified, then `idx` is ignored and the camera is determined based on the provided name
serial: camera serial number; if specified, then `idx` and `name` is ignored and the camera is determined based on the provided serial number
"""
Error=AlliedVisionVimbaXError
TimeoutError=AlliedVisionVimbaXTimeoutError
_TFrameInfo=TFrameInfo
def __init__(self, idx=0, name=None, serial=None):
super().__init__()
lib.initlib()
self.idx=idx
self.name=name
self.serial=serial
self.handle=None
self._cb_mgr=None
self._buffer_mgr=None
# self._start_queue_overflows=0
self.open()
self._raw_readout_format=False
self._add_info_variable("device_info",self.get_device_info)
self._add_settings_variable("exposure",self.get_exposure,self.set_exposure,ignore_error=AlliedVisionVimbaXError)
self._update_device_variable_order("exposure")
self._add_settings_variable("frame_period",self.get_frame_period,self.set_frame_period,ignore_error=AlliedVisionVimbaXError)
# self._add_status_variable("missed_frames",self.get_missed_frames_status)
# self._update_queue_overflows()
def _get_connection_parameters(self):
return (self.idx,self.name,self.serial)
[docs]
def open(self):
"""Open connection to the camera"""
if self.handle is not None:
return
with libctl.temp_open():
cams=list_cameras()
if self.name is not None or self.serial is not None:
use_serial=self.serial is not None
spar={"serial":self.serial} if use_serial else {"name":self.name}
idx=_find_camera(cams,**spar)
if idx is None:
kind,par=list(spar.items())[0]
camsstr=", ".join(["'{}'".format(c.serial if use_serial else c.name) for c in cams])
raise AlliedVisionVimbaXError("could not find camera with {} {}; available cameras are {}".format(kind,par,camsstr))
else:
idx=self.idx
if idx>=len(cams):
raise AlliedVisionVimbaXError("camera index {} is not available ({} cameras exist)".format(idx,len(cams)))
name=cams[idx].name
with self._close_on_error():
self.handle=lib.VmbCameraOpen(name,VmbAccessModeType.VmbAccessModeFull)
self._opid=libctl.open().opid
self._update_attributes()
self._cb_mgr=self.CallbackManager(self.handle)
self.post_open()
[docs]
def close(self):
"""Close connection to the camera"""
if self.handle is not None:
try:
self.clear_acquisition()
finally:
try:
lib.VmbCameraClose(self.handle)
finally:
self._cb_mgr=None
self.handle=None
libctl.close(self._opid)
self._opid=None
[docs]
def is_opened(self):
return self.handle is not None
[docs]
def post_open(self):
"""Additional setup after camera opening"""
_builtin_attrs=["OffsetX","OffsetY","Width","Height","SensorWidth","SensorHeight","PixelFormat","PayloadSize",
"AcquisitionStart","AcquisitionStop","AcquisitionStatus",
"AcquisitionFrameRate","AcquisitionFrameRateEnable"]
def _get_attribute_path(self, a):
return a.full_name
_use_full_attribute_names=False
def _list_attributes(self):
anames=[py3.as_str(a.name) for a in lib.VmbFeaturesList(self.handle)]
attrs=[AlliedVisionVimbaXAttribute(self.handle,n,full_name=None if self._use_full_attribute_names else n) for n in anames]
if self._use_full_attribute_names:
attrs+=[AlliedVisionVimbaXAttribute(self.handle,n,full_name=n) for n in self._builtin_attrs if n in anames]
attrs=[a for a in attrs if a.kind in ["int","float","bool","enum","str","command"]]
return attrs
[docs]
def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ
"""
Get value of an attribute with the given name.
If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`.
If `default` is not ``None``, assume that ``error_on_missing==False``.
If `name` points at a dictionary branch, return a dictionary with all values in this branch.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs]
def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ, arguments-renamed
"""
Set value of an attribute with the given name.
If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing.
If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary).
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs]
def call_command(self, name):
"""Execute the given command"""
self.ca[name].call_command()
[docs]
def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ
"""Get values of all attributes with the given `root`"""
values=dictionary.Dictionary()
for n,att in self.get_attribute(root).as_dict("flat").items():
if att.readable:
try:
values[n]=att.get_value(enum_as_str=enum_as_str)
except AlliedVisionVimbaXError: # sometimes nominally implemented features still raise errors
if not ignore_errors:
raise
return values
[docs]
def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ
"""
Set values of all attributes with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs]
def get_device_info(self):
"""
Get camera information.
Return tuple ``(name, model, serial, devclass, devversion, vendor, friendly_name, user_name, props)``.
"""
caminfo=lib.VmbCameraInfoQueryByHandle(self.handle)
return TDeviceInfo(*_parse_cam_info(caminfo))
def _get_data_dimensions_rc(self):
return self.cav["Height"],self.cav["Width"]
[docs]
def get_detector_size(self):
return self.cav["SensorWidth"],self.cav["SensorHeight"]
[docs]
def get_roi(self):
ox=self.get_attribute_value("OffsetX",default=0)
oy=self.get_attribute_value("OffsetY",default=0)
w=self.cav["Width"]
h=self.cav["Height"]
return ox,ox+w,oy,oy+h
def _set_minimal_value(self, attr):
v=self.ca[attr].min
vm=self.ca[attr].max
i=self.ca[attr].inc or 1
while True:
try:
self.cav[attr]=v
return v
except AlliedVisionVimbaXLibError:
if v>=vm:
raise
v+=i
[docs]
@camera.acqcleared
def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
for a in ["Width","Height","OffsetX","OffsetY"]:
if a in self.ca:
self.ca[a].update_limits()
if not self.ca[a].writable:
return self.get_roi()
det_size=self.get_detector_size()
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
self._set_minimal_value("Width")
self._set_minimal_value("Height")
self.cav["OffsetX"]=hstart
self.cav["OffsetY"]=vstart
self.cav["Width"]=max(self.cav["Width"],hend-hstart)
self.cav["Height"]=max(self.cav["Height"],vend-vstart)
return self.get_roi()
[docs]
def get_roi_limits(self, hbin=1, vbin=1):
params=["Width","Height","OffsetX","OffsetY"]
valp=tuple([(self.ca[p].get_value() if p in self.ca else 0) for p in params])
minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params])
maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params])
incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params])
hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0]+valp[2],incp[2] or maxp[0],incp[0] or maxp[0],1)
vlim=camera.TAxisROILimit(minp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1)
return hlim,vlim
_exposure_time_properties=["ExposureTimeAbs","ExposureTime"]
[docs]
def get_exposure(self):
for p in self._exposure_time_properties:
exp=self.get_attribute_value(p,error_on_missing=False)
if exp is not None:
return exp/1E6 # in us by default
bexp=self.get_attribute_value("ExposureTimeBaseAbs",error_on_missing=False)
rexp=self.get_attribute_value("ExposureTimeRaw",error_on_missing=False)
if bexp is not None and rexp is not None:
return bexp*rexp/1E6
raise AlliedVisionVimbaXError("camera does not support exposure")
[docs]
def set_exposure(self, exposure):
for p in self._exposure_time_properties:
if p in self.attributes:
self.cav[p]=exposure*1E6
return self.get_exposure()
if "ExposureTimeBaseAbs" in self.attributes and "ExposureTimeRaw" in self.attributes:
self.cav["ExposureTimeRaw"]=(exposure/self.cav["ExposureTimeBaseAbs"])*1E6
else:
raise AlliedVisionVimbaXError("camera does not support exposure")
return self.get_exposure()
[docs]
def get_frame_period(self):
fps=self.get_attribute_value("AcquisitionFrameRate",error_on_missing=False)
if fps is not None:
period=1./fps
try:
exposure=self.get_exposure()
return max(exposure,period)
except AlliedVisionVimbaXError:
return period
try:
return self.get_exposure()
except AlliedVisionVimbaXError:
raise AlliedVisionVimbaXError("camera does not support frame period")
[docs]
def set_frame_period(self, frame_period):
"""Set frame period (time between two consecutive frames in the internal trigger mode)"""
enable_period=frame_period>0
if "AcquisitionFrameRate" in self.attributes:
if "AcquisitionFrameRateEnable" in self.attributes:
self.cav["AcquisitionFrameRateEnable"]=enable_period
if enable_period:
self.cav["AcquisitionFrameRate"]=1./frame_period if frame_period>0 else self.attributes["AcquisitionFrameRate"].max
else:
raise AlliedVisionVimbaXError("camera does not support frame period")
return self.get_frame_period()
[docs]
def get_frame_timings(self):
return self._TAcqTimings(self.get_exposure(),self.get_frame_period())
[docs]
class CallbackManager:
def __init__(self, handle):
self.handle=handle
self.callback=get_callback()
self._cfuncs=(ctypes.c_size_t*2)()
self._cfuncs[0]=funcaddressof(lib.lib.VmbCaptureFrameQueue)
self._cfuncs[1]=self.get_callback_ptr()
self._cctl=(ctypes.c_size_t*3)(0)
self._cstat=(ctypes.c_size_t*3)(0)
self._cbuff=(ctypes.c_size_t*5)(0)
self.frames=None
self.commdata=as_ctypes_array([ctypes.addressof(c) for c in [self._cfuncs,self._cctl,self._cbuff,self._cstat]],ctypes.c_size_t)
[docs]
def get_callback_ptr(self):
return self.callback
[docs]
def announce(self, buff):
self.revoke()
context=ctypes.addressof(self.commdata)
self.frames=[lib.VmbFrameAnnounce(self.handle,buff.get_buffer(i),buff.size,context) for i in range(buff.nbuff)]
self._cbuff[0]=buff.nbuff
self._cbuff[1]=buff.size
self._cbuff[2]=buff.fi_size
self._cbuff[3]=buff.buff_ptr
self._cbuff[4]=buff.fi_buff_ptr
[docs]
def revoke(self):
if self.frames is None:
return
self.stop()
lib.VmbFrameRevokeAll(self.handle)
self.frames=[]
[docs]
def reset(self):
self._cstat[:]=[0]*len(self._cstat)
self._cctl[2]=0
[docs]
def start(self, acquire_frameinfo=True):
self.stop()
self.reset()
for f in self.frames:
lib.VmbCaptureFrameQueue(self.handle,ctypes.pointer(f),self.get_callback_ptr())
self._cctl[1]=1 if acquire_frameinfo else 0
self._cctl[0]=1
[docs]
def stop(self):
self._cctl[0]=0
lib.VmbCaptureQueueFlush(self.handle)
[docs]
def is_running(self):
return bool(self._cctl[0])
[docs]
def get_stat(self):
return tuple(int(v) for v in self._cstat)
[docs]
class BufferManager:
"""Buffer manager, which deals with buffer memory allocation, registering and deregistering, and retrieving the result and the leftovers"""
_frame_info_size=6
def __init__(self, size, nbuff):
self.size=size
self.fi_size=self._frame_info_size*8
self.nbuff=nbuff
self._full_buffer=ctypes.create_string_buffer(self.size*nbuff)
self.buff_ptr=ctypes.addressof(self._full_buffer)
self._full_fi_buffer=ctypes.create_string_buffer(self.fi_size*nbuff)
self.fi_buff_ptr=ctypes.addressof(self._full_fi_buffer)
[docs]
def get_buffer(self, fidx):
"""Get buffer corresponding to the given frame index"""
return self.buff_ptr+(fidx%self.nbuff)*self.size
[docs]
def get_frameinfo_buffer(self, fidx):
"""Get frame info buffer corresponding to the given frame index"""
return self.fi_buff_ptr+(fidx%self.nbuff)*self.fi_size
def _allocate_buffers(self, nbuff):
self._deallocate_buffers()
size=lib.VmbPayloadSizeGet(self.handle)
# nbuff=min(nbuff,2**30//size)
self._buffer_mgr=self.BufferManager(size,nbuff)
return nbuff
def _deallocate_buffers(self):
if self._buffer_mgr is not None:
self._buffer_mgr=None
[docs]
@interface.use_parameters(mode="acq_mode")
def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ
"""
Setup acquisition mode.
`mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition).
`nframes` sets up number of frame buffers.
"""
self.clear_acquisition()
# self.set_attribute_value("AcquisitionMode","Continuous",error_on_missing=False)
nframes=self._allocate_buffers(nbuff=nframes)
self._cb_mgr.announce(self._buffer_mgr)
super().setup_acquisition(mode=mode,nframes=nframes)
[docs]
def clear_acquisition(self):
self.stop_acquisition()
if self._buffer_mgr is not None:
self._cb_mgr.revoke()
self._deallocate_buffers()
super().clear_acquisition()
[docs]
def start_acquisition(self, *args, **kwargs):
self.stop_acquisition()
if self._buffer_mgr is not None and lib.VmbPayloadSizeGet(self.handle)!=self._buffer_mgr.size:
self.clear_acquisition()
super().start_acquisition(*args,**kwargs)
self._frame_counter.reset(self._acq_params["nframes"])
self._cb_mgr.start()
lib.VmbCaptureStart(self.handle)
self.call_command("AcquisitionStart")
[docs]
def stop_acquisition(self):
if self.acquisition_in_progress():
self.call_command("AcquisitionStop")
lib.VmbCaptureEnd(self.handle)
self._cb_mgr.stop()
self._frame_counter.update_acquired_frames(self._get_acquired_frames())
super().stop_acquisition()
[docs]
def acquisition_in_progress(self):
# return self._cb_mgr.is_running() and self.get_attribute_value("AcquisitionStatus",False)
return self._cb_mgr.is_running()
def _get_acquired_frames(self):
return self._cb_mgr.get_stat()[0]
[docs]
def enable_raw_readout(self, enable="rows"):
"""
Enable raw frame transfer.
Should be used if the camera uses unsupported pixel format.
Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array),
``"rows"`` (return a 2D array, where each row corresponds to a single image row),
or ``False`` (convert to image data, or raise an error if the format is not supported; default)
"""
funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"})
self._raw_readout_format=enable
def _parse_buffer(self, buff, size, shape, pixel_format, n=1):
data=np.ctypeslib.as_array(ctypes.cast(buff,ctypes.POINTER(ctypes.c_ubyte)),shape=(n,size))
if self._raw_readout_format=="frame":
return data[:,None,:]
if self._raw_readout_format=="rows":
return data.reshape((n,shape[0],-1))
supported_formats=["Mono8","Mono10","Mono12","Mono12p","Mono16","Mono32"]
if pixel_format not in supported_formats:
sf_string=", ".join(supported_formats)
raise AlliedVisionVimbaXError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string))
if pixel_format=="Mono8":
return data.reshape((n,)+shape)
elif pixel_format in ["Mono10","Mono12","Mono16"]:
return data.view("<u2").reshape((n,)+shape)
elif pixel_format =="Mono12p":
return pf.convert_uint12(data.reshape(n,shape[0],-1),mode="little_endian",width=shape[1])
else:
return data.view("<u4").reshape((n,)+shape)
def _parse_frame_info(self, idx, buff, n=1):
camfi=np.ctypeslib.as_array(ctypes.cast(buff,ctypes.POINTER(ctypes.c_uint64)),shape=(n,self.BufferManager._frame_info_size))[:,[0,1,4,5]]
return np.column_stack((np.arange(idx,idx+n,dtype=camfi.dtype),camfi))
_support_chunks=True
def _read_frames(self, rng, return_info=False):
size=self._buffer_mgr.size
shape=self.cav["Height"],self.cav["Width"]
pixel_format=self.cav["PixelFormat"]
nbuff=self._buffer_mgr.nbuff
i0,i1=rng
if (i1-1)//nbuff==i0//nbuff:
chunks=[(i0,i1-i0)]
else:
cut=(i1//nbuff)*nbuff
chunks=[(i0,cut-i0),(cut,i1-cut)]
frames=[self._parse_buffer(self._buffer_mgr.get_buffer(b),size,shape,pixel_format,n=n) for b,n in chunks]
if not self._raw_readout_format:
frames=[self._convert_indexing(f,"rct",axes=(-2,-1)) for f in frames]
frame_info=None
if return_info:
frame_info=[self._parse_frame_info(b,self._buffer_mgr.get_frameinfo_buffer(b),n=n) for b,n in chunks]
return frames,frame_info