from pylablib.core.utils import funcargparse
from .SpinnakerC_defs import spinNodeType, spinVisibility, spinAccessMode, spinRepresentation # pylint: disable=unused-import
from .SpinnakerC_lib import wlib as lib, TeledyneSpinnakerError, TeledyneSpinnakerLibError # pylint: disable=unused-import
from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera
from ..utils import load_lib
from ...core.utils.ctypes_tools import funcaddressof, as_ctypes_array
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
from .utils import get_callback # pylint: disable=no-name-in-module
import numpy as np
import collections
import ctypes
class TeledyneSpinnakerTimeoutError(TeledyneSpinnakerError):
"Teledyne Spinnaker frame timeout error"
[docs]
class LibraryController(load_lib.LibraryController):
def __init__(self, lib): # pylint: disable=redefined-outer-name
super().__init__(lib)
self.h=None
def _do_init(self):
self.h=lib.spinSystemGetInstance()
def _do_uninit(self):
if self.h is not None:
lib.spinSystemReleaseInstance(self.h)
self.h=None
libctl=LibraryController(lib)
[docs]
def restart_lib():
libctl.shutdown()
[docs]
def get_SDK_version():
"""Get version of Teledyne Spinnaker SDK"""
with libctl.temp_open():
return ".".join([str(v) for v in lib.spinSystemGetLibraryVersion(libctl.h)])
_camera_info_alias={"DeviceDisplayName":"name", "DeviceModelName":"model", "DeviceSerialNumber":"serial", "DeviceType":"devclass", "DeviceVersion":"devversion",
"DeviceVendorName":"vendor","DeviceUserID":"user_name"}
TCameraInfo=collections.namedtuple("TCameraInfo",["name","model","serial","devclass","devversion","vendor","user_name"])
def _get_device_info(cam):
"""Get Spinnaker camera info for a camera with the given index"""
nmap=None
cam_info={v:None for v in _camera_info_alias.values()}
try:
nmap=lib.spinCameraGetTLDeviceNodeMap(cam)
for n,a in _camera_info_alias.items():
try:
node=lib.spinNodeMapGetNode(nmap,n)
cam_info[a]=py3.as_str(lib.spinNodeToString(node))
lib.spinNodeMapReleaseNode(nmap,node)
except TeledyneSpinnakerError:
pass
except TeledyneSpinnakerError:
pass
return TCameraInfo(**cam_info)
def _parse_camera_list(camlst):
ndev=lib.spinCameraListGetSize(camlst)
infos=[]
for i in range(ndev):
cam=None
try:
cam=lib.spinCameraListGet(camlst,i)
infos.append(_get_device_info(cam))
finally:
if cam is not None:
lib.spinCameraRelease(cam)
return infos
[docs]
def list_cameras(desc=True):
"""
List all cameras available through Teledyne Spinnaker interface
If ``desc==True``, return complete camera descriptions; otherwise, simply return the names.
"""
with libctl.temp_open():
camlst=lib.spinCameraListCreateEmpty()
lib.spinSystemGetCameras(libctl.h,camlst)
infos=_parse_camera_list(camlst)
lib.spinCameraListClear(camlst)
lib.spinCameraListDestroy(camlst)
return infos if desc else [inf.name for inf in infos]
def _find_camera(cameras, **kwargs):
for i,c in enumerate(cameras):
match=True
for k,v in kwargs.items():
if getattr(c,k)!=v:
match=False
break
if match:
return i
return None
[docs]
def get_cameras_number():
"""Get number of connected Teledyne Spinnaker cameras"""
with libctl.temp_open():
camlst=lib.spinCameraListCreateEmpty()
lib.spinSystemGetCameras(libctl.h,camlst)
ndev=lib.spinCameraListGetSize(camlst)
lib.spinCameraListClear(camlst)
lib.spinCameraListDestroy(camlst)
return ndev
[docs]
class TeledyneSpinnakerAttribute:
"""
Object representing an Teledyne Spinnaker GenAPI attribute.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`SpinnakerCamera` instance.
Args:
node: Spinnaker GenApi node handler
full_name: if supplied, attribute's full name, including the tree structure
Attributes:
name: attribute name
kind: attribute kind; can be ``"int"``, ``"float"``, ``"bool"``, ``"enum"``, ``"str"``,
``"command"``, ``"category"``, or ``"unknown"``
display_name: attribute display name (short description name)
tooltip: longer attribute description
description: full attribute description (usually, same as `tooltip`)
visibility: attribute visibility; can be ``"simple"``, ``"intermediate"``, ``"advanced"``, ``"invisible"``, or ``"unknown"``
access: attribute access level; can be ``"read_only"``, ``"write_only"``, ``"read_write"``,
``"na"`` (not applicable, e.g., command), or ``"not_implemented"``
readable (bool): whether attribute is readable
writable (bool): whether attribute is writable
implemented (bool): whether the attribute is implemented in the given camera (normally always ``True``)
available (bool): whether the attribute can be changed or called
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
units: attribute units (if applicable)
repr: shows what a numerical unit represents; can be ``"lin"``, ``"log"``, ``"bool"``, ``"pure"``, ``"hex"``, or ``"unknown"``
ivalues: list of possible integer values for enum attributes
values: list of possible text values for enum attributes
labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values
ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute
"""
_attr_types={ spinNodeType.IntegerNode:"int",
spinNodeType.FloatNode:"float",
spinNodeType.BooleanNode:"bool",
spinNodeType.EnumerationNode:"enum",
spinNodeType.StringNode:"str",
spinNodeType.CommandNode:"command",
spinNodeType.CategoryNode:"category",
spinNodeType.UnknownNode:"unknown"}
_vis_types={ spinVisibility.Beginner:"simple",
spinVisibility.Expert:"intermediate",
spinVisibility.Guru:"advanced",
spinVisibility.Invisible:"invisible",
spinVisibility._UndefinedVisibility:"unknown"}
_acc_types={ spinAccessMode.RO:"read_only",
spinAccessMode.WO:"write_only",
spinAccessMode.RW:"read_write",
spinAccessMode.NA:"na",
spinAccessMode.NI:"not_implemented",
spinAccessMode._UndefinedAccesMode:"unknown"}
_repr_type={ spinRepresentation.Linear:"lin",
spinRepresentation.Logarithmic:"log",
spinRepresentation.Boolean:"bool",
spinRepresentation.PureNumber:"pure",
spinRepresentation.HexNumber:"hex",
spinRepresentation.IPV4Address:"ipv4",
spinRepresentation.MACAddress:"mac",
spinRepresentation._UndefinedRepresentation:"unknown"}
def __init__(self, node, full_name=None):
self.node=node
self.name=py3.as_str(lib.spinNodeGetName(node))
self.full_name=full_name or self.name
self.kind=self._attr_types.get(lib.spinNodeGetType(node),"unknown")
self.display_name=py3.as_str(lib.spinNodeGetDisplayName(node))
self.tooltip=py3.as_str(lib.spinNodeGetToolTip(node))
self.description=py3.as_str(lib.spinNodeGetDescription(node))
self.visibility=self._vis_types[lib.spinNodeGetVisibility(node)]
self.access=self._acc_types[lib.spinNodeGetAccessMode(node)]
self.implemented=bool(lib.spinNodeIsImplemented(node))
self.available=bool(lib.spinNodeIsAvailable(node))
self.readable=bool(lib.spinNodeIsReadable(node))
self.writable=bool(lib.spinNodeIsWritable(node))
self.units=None
self.min=None
self.max=None
self.inc=None
self.repr=None
self._value_nodes=None
self.values=[]
self.ivalues=[]
self.labels={}
self.ilabels={}
self._fill_info()
def _fill_info(self):
if self.kind=="int":
self.repr=self._repr_type[lib.spinIntegerGetRepresentation(self.node)]
elif self.kind=="float":
self.repr=self._repr_type[lib.spinFloatGetRepresentation(self.node)]
self.units=py3.as_str(lib.spinFloatGetUnit(self.node))
elif self.kind=="enum":
nenum=lib.spinEnumerationGetNumEntries(self.node)
self._value_nodes=[lib.spinEnumerationGetEntryByIndex(self.node,i) for i in range(nenum)]
self.update_limits()
def _update_attributes(self):
self.access=self._acc_types[lib.spinNodeGetAccessMode(self.node)]
self.available=bool(lib.spinNodeIsAvailable(self.node))
self.readable=bool(lib.spinNodeIsReadable(self.node))
self.writable=bool(lib.spinNodeIsWritable(self.node))
[docs]
def update_limits(self):
"""Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``"""
self._update_attributes()
if self.kind=="int":
try:
self.min=lib.spinIntegerGetMin(self.node)
except TeledyneSpinnakerLibError:
self.min=None
try:
self.max=lib.spinIntegerGetMax(self.node)
except TeledyneSpinnakerLibError:
self.max=None
try:
self.inc=lib.spinIntegerGetInc(self.node)
except TeledyneSpinnakerLibError:
self.inc=None
return (self.min,self.max,self.inc)
elif self.kind=="float":
try:
self.min=lib.spinFloatGetMin(self.node)
except TeledyneSpinnakerLibError:
self.min=None
try:
self.max=lib.spinFloatGetMax(self.node)
except TeledyneSpinnakerLibError:
self.max=None
return (self.min,self.max,self.inc)
elif self.kind=="enum":
self.values=[py3.as_str(lib.spinEnumerationEntryGetSymbolic(n)) for n in self._value_nodes]
self.ivalues=[lib.spinEnumerationEntryGetIntValue(n) for n in self._value_nodes]
self.labels=dict(zip(self.values,self.ivalues))
self.ilabels=dict(zip(self.ivalues,self.values))
[docs]
def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_limits()
if self.kind in ["int","float"]:
if self.min is not None and value<self.min:
value=self.min
elif self.max is not None and value>self.max:
value=self.max
else:
if self.min is not None and self.inc and self.inc>0:
value=((value-self.min)//self.inc)*self.inc+self.min
return value
[docs]
def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if self.kind=="command":
return bool(lib.spinCommandIsDone(self.node))
self._update_attributes()
if not self.readable:
raise TeledyneSpinnakerError("attribute {} is not readable".format(self.name))
if self.kind=="int":
return lib.spinIntegerGetValue(self.node)
if self.kind=="float":
return lib.spinFloatGetValue(self.node)
if self.kind=="bool":
return lib.spinBooleanGetValue(self.node)
if self.kind=="str":
return py3.as_str(lib.spinNodeToString(self.node))
if self.kind=="enum":
value=py3.as_str(lib.spinNodeToString(self.node))
if not enum_as_str:
value=self.labels[value]
return value
if self.kind=="unknown":
return None
raise TeledyneSpinnakerError("attribute {} of kind {} can not be read".format(self.name,self.kind))
[docs]
def set_value(self, value, truncate=True):
"""
Set attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
self._update_attributes()
if not self.writable:
raise TeledyneSpinnakerError("attribute {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
if self.kind=="int":
lib.spinIntegerSetValue(self.node,int(value))
elif self.kind=="float":
lib.spinFloatSetValue(self.node,float(value))
elif self.kind=="bool":
lib.spinBooleanSetValue(self.node,bool(value))
elif self.kind=="str":
lib.spinNodeFromString(self.node,str(value))
elif self.kind=="enum":
value=self.ilabels.get(value,value)
lib.spinNodeFromString(self.node,str(value))
elif self.kind!="unknown":
raise TeledyneSpinnakerError("attribute {} of kind {} can not be set".format(self.name,self.kind))
[docs]
def call_command(self):
"""Execute the given command"""
if self.kind=="command":
if not self.implemented:
raise TeledyneSpinnakerError("command is not implemented: {}".format(self.name))
lib.spinCommandExecute(self.node)
else:
raise TeledyneSpinnakerError("attribute {} is not a command".format(self.name))
def __repr__(self):
return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",TCameraInfo._fields)
TMissedFramesStatus=collections.namedtuple("TMissedFramesStatus",["stream_dropped","queue_overflows","unprocessed"])
TFrameInfo=collections.namedtuple("TFrameInfo",["frame_index","framestamp","timestamp"])
[docs]
class TeledyneSpinnakerCamera(camera.IROICamera, camera.IAttributeCamera, camera.IExposureCamera):
"""
Generic Teledyne Spinnaker camera interface.
Args:
idx: camera index among the cameras listed using :func:`list_cameras`
name: camera name; if specified, then `idx` is ignored and the camera is determined based on the provided name
serial: camera serial number; if specified, then `idx` and `name` is ignored and the camera is determined based on the provided serial number
"""
Error=TeledyneSpinnakerError
TimeoutError=TeledyneSpinnakerTimeoutError
_TFrameInfo=TFrameInfo
def __init__(self, idx=0, name=None, serial=None):
super().__init__()
lib.initlib()
self.idx=idx
self.name=name
self.serial=serial
self.hdev=None
self._camlst=None
self._cb_mgr=self.CallbackManager()
self._buffer_mgr=None
self._start_queue_overflows=0
self.open()
self._raw_readout_format=False
self._add_info_variable("device_info",self.get_device_info)
self._add_settings_variable("exposure",self.get_exposure,self.set_exposure,ignore_error=TeledyneSpinnakerError)
self._update_device_variable_order("exposure")
self._add_settings_variable("frame_period",self.get_frame_period,self.set_frame_period,ignore_error=TeledyneSpinnakerError)
self._add_status_variable("missed_frames",self.get_missed_frames_status)
self._update_queue_overflows()
def _get_connection_parameters(self):
return (self.idx,self.name,self.serial)
[docs]
def open(self):
"""Open connection to the camera"""
if self.hdev is not None:
return
with libctl.temp_open():
camlst=lib.spinCameraListCreateEmpty()
lib.spinSystemGetCameras(libctl.h,camlst)
self._camlst=camlst
try:
cams=_parse_camera_list(camlst)
if self.name is not None or self.serial is not None:
use_serial=self.serial is not None
spar={"serial":self.serial} if use_serial else {"name":self.name}
idx=_find_camera(cams,**spar)
if idx is None:
kind,par=list(spar.items())[0]
camsstr=", ".join(["'{}'".format(c.serial if use_serial else c.cam) for c in cams])
raise TeledyneSpinnakerError("could not find camera with {} {}; available cameras are {}".format(kind,par,camsstr))
else:
idx=self.idx
if idx>=len(cams):
raise TeledyneSpinnakerError("camera index {} is not available ({} cameras exist)".format(idx,len(cams)))
with self._close_on_error():
self.hdev=lib.spinCameraListGet(camlst,idx)
lib.spinCameraInit(self.hdev)
self._opid=libctl.open().opid
self._update_attributes()
self.post_open()
except TeledyneSpinnakerError:
self._close_camlst()
raise
def _close_camlst(self):
try:
lib.spinCameraListClear(self._camlst)
lib.spinCameraListDestroy(self._camlst)
finally:
self._camlst=None
[docs]
def close(self):
"""Close connection to the camera"""
if self.hdev is not None:
try:
self.clear_acquisition()
self._cb_mgr.destroy()
finally:
try:
lib.spinCameraDeInit(self.hdev)
lib.spinCameraRelease(self.hdev)
self._close_camlst()
finally:
self.hdev=None
libctl.close(self._opid)
self._opid=None
[docs]
def is_opened(self):
return (self.hdev is not None) and bool(lib.spinCameraIsInitialized(self.hdev))
[docs]
def post_open(self):
"""Additional setup after camera opening"""
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize"]
def _get_attribute_path(self, a):
return a.full_name
def _get_map_nodes(self, nmap, rn="Root", full_name=False, pfx=""):
root=lib.spinNodeMapGetNode(nmap,rn)
nodes=lib.collect_nodes(root,add_branch=True)
nodes={(n[5:] if n.startswith(rn+"/") else n):v for n,v in nodes.items()}
return [TeledyneSpinnakerAttribute(node,full_name=n if full_name else pfx+py3.as_str(lib.spinNodeGetName(node))) for n,node in nodes.items()]
def _list_attributes(self):
camnmap=lib.spinCameraGetNodeMap(self.hdev)
attrs=self._get_map_nodes(camnmap)
attrs+=self._get_map_nodes(lib.spinCameraGetTLStreamNodeMap(self.hdev),pfx="TLStream/")
for n in self._builtin_attrs:
node=lib.spinNodeMapGetNode(camnmap,n)
if node:
attrs.append(TeledyneSpinnakerAttribute(node,full_name=n))
attrs=[a for a in attrs if a.kind in ["int","float","bool","enum","str","command"]]
return attrs
[docs]
def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ
"""
Get value of an attribute with the given name.
If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`.
If `default` is not ``None``, assume that ``error_on_missing==False``.
If `name` points at a dictionary branch, return a dictionary with all values in this branch.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs]
def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ, arguments-renamed
"""
Set value of an attribute with the given name.
If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing.
If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary).
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs]
def call_command(self, name):
"""Execute the given command"""
self.ca[name].call_command()
[docs]
def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ
"""Get values of all attributes with the given `root`"""
values=dictionary.Dictionary()
for n,att in self.get_attribute(root).as_dict("flat").items():
if att.readable:
try:
values[n]=att.get_value(enum_as_str=enum_as_str)
except TeledyneSpinnakerError: # sometimes nominally implemented features still raise errors
if not ignore_errors:
raise
return values
[docs]
def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ
"""
Set values of all attributes with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs]
def get_device_info(self):
"""
Get camera information.
Return tuple ``(name, model, serial, devclass, devversion, vendor, friendly_name, user_name, props)``.
"""
return TCameraInfo(*_get_device_info(self.hdev))
def _get_data_dimensions_rc(self):
return self.cav["Height"],self.cav["Width"]
[docs]
def get_detector_size(self):
return self.ca["Width"].max,self.ca["Height"].max
[docs]
def get_roi(self):
ox=self.get_attribute_value("OffsetX",default=0)
oy=self.get_attribute_value("OffsetY",default=0)
w=self.cav["Width"]
h=self.cav["Height"]
return ox,ox+w,oy,oy+h
[docs]
@camera.acqcleared
def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
for a in ["Width","Height","OffsetX","OffsetY"]:
if a in self.ca:
self.ca[a].update_limits()
if self.ca[a].writable:
continue
return self.get_roi()
det_size=self.get_detector_size()
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
self.cav["Width"]=self.ca["Width"].min
self.cav["Height"]=self.ca["Height"].min
self.cav["OffsetX"]=hstart
self.cav["OffsetY"]=vstart
self.cav["Width"]=max(self.cav["Width"],hend-hstart)
self.cav["Height"]=max(self.cav["Height"],vend-vstart)
return self.get_roi()
[docs]
def get_roi_limits(self, hbin=1, vbin=1):
params=["Width","Height","OffsetX","OffsetY"]
minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params])
maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params])
incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params])
hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0],incp[2] or maxp[0],incp[0] or maxp[0],1)
vlim=camera.TAxisROILimit(minp[1] or maxp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1)
return hlim,vlim
_exposure_time_properties=["ExposureTimeAbs","ExposureTime"]
[docs]
def get_exposure(self):
for p in self._exposure_time_properties:
exp=self.get_attribute_value(p,error_on_missing=False)
if exp is not None:
return exp/1E6 # in us by default
bexp=self.get_attribute_value("ExposureTimeBaseAbs",error_on_missing=False)
rexp=self.get_attribute_value("ExposureTimeRaw",error_on_missing=False)
if bexp is not None and rexp is not None:
return bexp*rexp/1E6
raise TeledyneSpinnakerError("camera does not support exposure")
[docs]
def set_exposure(self, exposure):
for p in self._exposure_time_properties:
if p in self.attributes:
self.cav[p]=exposure*1E6
return self.get_exposure()
if "ExposureTimeBaseAbs" in self.attributes and "ExposureTimeRaw" in self.attributes:
self.cav["ExposureTimeRaw"]=(exposure/self.cav["ExposureTimeBaseAbs"])*1E6
else:
raise TeledyneSpinnakerError("camera does not support exposure")
return self.get_exposure()
[docs]
def get_frame_period(self):
fps=self.get_attribute_value("AcquisitionResultingFrameRate",error_on_missing=False)
if fps is not None:
period=1./fps
try:
exposure=self.get_exposure()
return max(exposure,period)
except TeledyneSpinnakerError:
return period
try:
return self.get_exposure()
except TeledyneSpinnakerError:
raise TeledyneSpinnakerError("camera does not support frame period")
[docs]
def set_frame_period(self, frame_period):
"""Set frame period (time between two consecutive frames in the internal trigger mode)"""
if "AcquisitionFrameRate" in self.attributes:
self.cav["AcquisitionFrameRate"]=1./frame_period if frame_period>0 else self.attributes["AcquisitionFrameRate"].max
if "AcquisitionFrameRateEnable" in self.attributes:
self.cav["AcquisitionFrameRateEnable"]=frame_period>0
else:
raise TeledyneSpinnakerError("camera does not support frame period")
return self.get_frame_period()
[docs]
def get_frame_timings(self):
return self._TAcqTimings(self.get_exposure(),self.get_frame_period())
[docs]
class CallbackManager:
def __init__(self):
self._cfuncs=(ctypes.c_size_t*7)()
self._cfuncs[0]=funcaddressof(lib.lib.spinImageIsIncomplete)
self._cfuncs[1]=funcaddressof(lib.lib.spinImageGetSize)
self._cfuncs[2]=funcaddressof(lib.lib.spinImageGetWidth)
self._cfuncs[3]=funcaddressof(lib.lib.spinImageGetHeight)
self._cfuncs[4]=funcaddressof(lib.lib.spinImageGetData)
self._cfuncs[5]=funcaddressof(lib.lib.spinImageGetFrameID)
self._cfuncs[6]=funcaddressof(lib.lib.spinImageGetTimeStamp)
self._cctl=(ctypes.c_size_t*3)(0)
self._cstat=(ctypes.c_size_t*3)(0)
self._cbuff=(ctypes.c_size_t*4)(0)
self.commdata=as_ctypes_array([ctypes.addressof(c) for c in [self._cfuncs,self._cctl,self._cbuff,self._cstat]],ctypes.c_size_t)
self.callback=get_callback()
self.hcb=None
self._reg_hdev=None
[docs]
def get_callback_ptr(self):
return self.callback.address
[docs]
def create(self, force=False):
if force:
self.destroy()
if self.hcb is None:
self.hcb=lib.spinImageEventHandlerCreate(self.callback,ctypes.addressof(self.commdata))
[docs]
def destroy(self):
if self.hcb is not None:
self.unregister()
try:
lib.spinImageEventHandlerDestroy(self.hcb)
finally:
self.hcb=None
[docs]
def register(self, hdev, buff):
self.reset()
self._cbuff[0]=buff.nbuff
self._cbuff[1]=buff.size
self._cbuff[2]=buff.buff_ptr
self._cbuff[3]=buff.fi_buff_ptr
self.start()
if self._reg_hdev is None:
self.create()
lib.spinCameraRegisterImageEventHandler(hdev,self.hcb)
self._reg_hdev=hdev
[docs]
def unregister(self):
if self._reg_hdev is not None:
try:
lib.spinCameraUnregisterImageEventHandler(self._reg_hdev,self.hcb)
finally:
self._reg_hdev=None
self._cbuff[:]=[0]*len(self._cbuff)
[docs]
def reset(self):
self._cstat[:]=[0]*len(self._cstat)
self._cctl[2]=0
[docs]
def start(self, acquire_frameinfo=True):
self.reset()
self._cctl[1]=1 if acquire_frameinfo else 0
self._cctl[0]=1
[docs]
def stop(self):
self._cctl[0]=0
[docs]
def get_stat(self):
return tuple(int(v) for v in self._cstat)
[docs]
class BufferManager:
"""Buffer manager, which deals with buffer memory allocation, registering and deregistering, and retrieving the result and the leftovers"""
def __init__(self, size, nbuff):
self.size=size
self.fi_size=2*8
self.nbuff=nbuff
self._full_buffer=ctypes.create_string_buffer(self.size*nbuff)
self.buff_ptr=ctypes.addressof(self._full_buffer)
self._full_fi_buffer=ctypes.create_string_buffer(self.fi_size*nbuff)
self.fi_buff_ptr=ctypes.addressof(self._full_fi_buffer)
[docs]
def get_buffer(self, fidx):
"""Get buffer corresponding to the given frame index"""
return self.buff_ptr+(fidx%self.nbuff)*self.size
[docs]
def get_frameinfo_buffer(self, fidx):
"""Get frame info buffer corresponding to the given frame index"""
return self.fi_buff_ptr+(fidx%self.nbuff)*self.fi_size
def _allocate_buffers(self, nbuff):
self._deallocate_buffers()
size=self.cav["PayloadSize"]
# nbuff=min(nbuff,2**30//size)
self._buffer_mgr=self.BufferManager(size,nbuff)
return nbuff
def _deallocate_buffers(self):
if self._buffer_mgr is not None:
self._buffer_mgr=None
[docs]
@interface.use_parameters(mode="acq_mode")
def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ
"""
Setup acquisition mode.
`mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition).
`nframes` sets up number of frame buffers.
"""
self.clear_acquisition()
self.set_attribute_value("AcquisitionMode","Continuous",error_on_missing=False)
nframes=self._allocate_buffers(nbuff=nframes)
self._cb_mgr.register(self.hdev,self._buffer_mgr)
super().setup_acquisition(mode=mode,nframes=nframes)
[docs]
def clear_acquisition(self):
self.stop_acquisition()
if self._buffer_mgr is not None:
self._cb_mgr.unregister()
self._deallocate_buffers()
super().clear_acquisition()
[docs]
def start_acquisition(self, *args, **kwargs):
self.stop_acquisition()
self._update_queue_overflows()
super().start_acquisition(*args,**kwargs)
self._frame_counter.reset(self._acq_params["nframes"])
self._cb_mgr.start()
lib.spinCameraBeginAcquisition(self.hdev)
[docs]
def stop_acquisition(self):
if self.acquisition_in_progress():
lib.spinCameraEndAcquisition(self.hdev)
self._cb_mgr.stop()
self._frame_counter.update_acquired_frames(self._get_acquired_frames())
super().stop_acquisition()
[docs]
def acquisition_in_progress(self):
return bool(lib.spinCameraIsStreaming(self.hdev))
def _get_acquired_frames(self):
return self._cb_mgr.get_stat()[0]
def _update_queue_overflows(self):
self._start_queue_overflows=self.get_attribute_value("TransferQueueOverflowCount",error_on_missing=False,default=0)
[docs]
def get_missed_frames_status(self):
"""
Get missed frames status.
Return tuple ``(stream_dropped, queue_overflows, unprocessed)`` with the number of frames dropped in the stream, the number of frames
lost due to the PC queue overflow (not processed fast enough) and the number of frames which could not be processed (incomplete or wrong size).
"""
stream_dropped=self.get_attribute_value("TLStream/StreamDroppedFrameCount",error_on_missing=False,default=0)
queue_overflows=self.get_attribute_value("TransferQueueOverflowCount",error_on_missing=False,default=0)-self._start_queue_overflows
fstat=self._cb_mgr.get_stat()
unprocessed=fstat[1]+fstat[2]
return TMissedFramesStatus(stream_dropped,queue_overflows,unprocessed)
[docs]
def enable_raw_readout(self, enable="rows"):
"""
Enable raw frame transfer.
Should be used if the camera uses unsupported pixel format.
Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array),
``"rows"`` (return a 2D array, where each row corresponds to a single image row),
or ``False`` (convert to image data, or raise an error if the format is not supported; default)
"""
funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"})
self._raw_readout_format=enable
def _parse_buffer(self, buff, size, shape, pixel_format, n=1):
data=np.ctypeslib.as_array(ctypes.cast(buff,ctypes.POINTER(ctypes.c_ubyte)),shape=(n,size))
if self._raw_readout_format=="frame":
return data[:,None,:]
if self._raw_readout_format=="rows":
return data.reshape((n,shape[0],-1))
supported_formats=["Mono8","Mono10","Mono12","Mono16","Mono32"]
if pixel_format not in supported_formats:
sf_string=", ".join(supported_formats)
raise TeledyneSpinnakerError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string))
if pixel_format=="Mono8":
return data.reshape((n,)+shape)
elif pixel_format in ["Mono10","Mono12","Mono16"]:
return data.view("<u2").reshape((n,)+shape)
else:
return data.view("<u4").reshape((n,)+shape)
def _parse_frame_info(self, idx, buff, n=1):
camfi=np.ctypeslib.as_array(ctypes.cast(buff,ctypes.POINTER(ctypes.c_uint64)),shape=(n,2))
return np.column_stack((np.arange(idx,idx+n,dtype=camfi.dtype),camfi))
_support_chunks=True
def _read_frames(self, rng, return_info=False):
size=self._buffer_mgr.size
shape=self.cav["Height"],self.cav["Width"]
pixel_format=self.cav["PixelFormat"]
nbuff=self._buffer_mgr.nbuff
i0,i1=rng
if (i1-1)//nbuff==i0//nbuff:
chunks=[(i0,i1-i0)]
else:
cut=(i1//nbuff)*nbuff
chunks=[(i0,cut-i0),(cut,i1-cut)]
frames=[self._parse_buffer(self._buffer_mgr.get_buffer(b),size,shape,pixel_format,n=n) for b,n in chunks]
if not self._raw_readout_format:
frames=[self._convert_indexing(f,"rct",axes=(-2,-1)) for f in frames]
frame_info=None
if return_info:
frame_info=[self._parse_frame_info(b,self._buffer_mgr.get_frameinfo_buffer(b),n=n) for b,n in chunks]
return frames,frame_info