from pylablib.core.utils import funcargparse
from . import pylonC_lib
from .pylonC_lib import EGenApiNodeType, EGenApiAccessMode, EGenApiRepresentation, EGenApiVisibility, EGenApiNameSpace, PYLONC_ACCESS # pylint: disable=unused-import
from .pylonC_lib import wlib as lib, BaslerError, BaslerLibError # pylint: disable=unused-import
from ...core.utils import py3, dictionary
from ...core.devio import interface
from ..interface import camera
from ..utils import load_lib
from ...core.utils.ctypes_tools import funcaddressof, as_ctypes_array
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
from .utils import looper # pylint: disable=no-name-in-module
import numpy as np
import collections
import ctypes
import threading
class BaslerTimeoutError(BaslerError):
"Basler frame timeout error"
[docs]
class LibraryController(load_lib.LibraryController):
def _do_init(self):
lib.PylonInitialize()
def _do_uninit(self):
lib.PylonTerminate()
libctl=LibraryController(lib)
[docs]
def restart_lib():
libctl.shutdown()
_camera_info_alias={"FullName":"name", "ModelName":"model", "SerialNumber":"serial", "DeviceClass":"devclass", "DeviceVersion":"devversion",
"VendorName":"vendor","FriendlyName":"friendly_name", "UserDefinedName":"user_name"}
TCameraInfo=collections.namedtuple("TCameraInfo",["name","model","serial","devclass","devversion","vendor","friendly_name","user_name","props"])
def _parse_device_info(info, info_handle=None):
cam_info={"props":{}}
for n,a in _camera_info_alias.items():
cam_info[a]=py3.as_str(getattr(info,n))
if info_handle is not None:
nprop=lib.PylonDeviceInfoGetNumProperties(info_handle)
for i in range(nprop):
pname=py3.as_str(lib.PylonDeviceInfoGetPropertyName(info_handle,i))
pval=py3.as_str(lib.PylonDeviceInfoGetPropertyValueByIndex(info_handle,i))
if pname in _camera_info_alias:
iname=_camera_info_alias[pname]
if not cam_info[iname]:
cam_info[iname]=pval
else:
cam_info["props"][pname]=pval
return TCameraInfo(**cam_info)
[docs]
def get_device_info(index):
"""Get Pylon camera info for a camera with the given index"""
info=lib.PylonGetDeviceInfo(index)
info_handle=lib.PylonGetDeviceInfoHandle(index)
return _parse_device_info(info,info_handle=info_handle)
[docs]
def list_cameras(desc=True):
"""
List all cameras available through Basler Pylon interface
If ``desc==True``, return complete camera descriptions; otherwise, simply return the names.
"""
with libctl.temp_open():
ndev=lib.PylonEnumerateDevices()
infos=[get_device_info(i) for i in range(ndev)]
return infos if desc else [inf.name for inf in infos]
def _find_camera(cameras, **kwargs):
for i,c in enumerate(cameras):
match=True
for k,v in kwargs.items():
if getattr(c,k)!=v:
match=False
break
if match:
return i
return None
[docs]
def get_cameras_number():
"""Get number of connected Basler Pylon cameras"""
return len(list_cameras(desc=False))
[docs]
class BaslerPylonAttribute:
"""
Object representing an Pylon GenAPI attribute.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`BaslerPylonCamera` instance.
Args:
node: pylon GenApi node handler
full_name: if supplied, attribute's full name, including the tree structure
Attributes:
name: attribute name
kind: attribute kind; can be ``"int"``, ``"float"``, ``"bool"``, ``"enum"``, ``"str"``,
``"command"``, ``"category"``, or ``"unknown"``
display_name: attribute display name (short description name)
tooltip: longer attribute description
description: full attribute description (usually, same as `tooltip`)
visibility: attribute visibility; can be ``"simple"``, ``"intermediate"``, ``"advanced"``, ``"invisible"``, or ``"unknown"``
access: attribute access level; can be ``"read_only"``, ``"write_only"``, ``"read_write"``,
``"na"`` (not applicable, e.g., command), or ``"not_implemented"``
readable (bool): whether attribute is readable
writable (bool): whether attribute is writable
implemented (bool): whether the attribute is implemented in the given camera (normally always ``True``)
available (bool): whether the attribute can be changed or called
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
units: attribute units (if applicable)
repr: shows what a numerical unit represents; can be ``"lin"``, ``"log"``, ``"bool"``, ``"pure"``, ``"hex"``, or ``"unknown"``
ivalues: list of possible integer values for enum attributes
values: list of possible text values for enum attributes
labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values
ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute
"""
_attr_types={ EGenApiNodeType.IntegerNode:"int",
EGenApiNodeType.FloatNode:"float",
EGenApiNodeType.BooleanNode:"bool",
EGenApiNodeType.EnumerationNode:"enum",
EGenApiNodeType.StringNode:"str",
EGenApiNodeType.CommandNode:"command",
EGenApiNodeType.Category:"category",
EGenApiNodeType._UnknownNodeType:"unknown"}
_vis_types={ EGenApiVisibility.Beginner:"simple",
EGenApiVisibility.Expert:"intermediate",
EGenApiVisibility.Guru:"advanced",
EGenApiVisibility.Invisible:"invisible",
EGenApiVisibility._UndefinedVisibility:"unknown"}
_acc_types={ EGenApiAccessMode.RO:"read_only",
EGenApiAccessMode.WO:"write_only",
EGenApiAccessMode.RW:"read_write",
EGenApiAccessMode.NA:"na",
EGenApiAccessMode.NI:"not_implemented",
EGenApiAccessMode._UndefinedAccesMode:"unknown"}
_repr_type={ EGenApiRepresentation.Linear:"lin",
EGenApiRepresentation.Logarithmic:"log",
EGenApiRepresentation.Boolean:"bool",
EGenApiRepresentation.PureNumber:"pure",
EGenApiRepresentation.HexNumber:"hex",
EGenApiRepresentation._UndefinedRepresentation:"unknown"}
def __init__(self, node, full_name=None):
self.node=node
self.name=py3.as_str(lib.GenApiNodeGetName(node))
self.full_name=full_name or self.name
self.kind=self._attr_types[lib.GenApiNodeGetType(node)]
self.display_name=py3.as_str(lib.GenApiNodeGetDisplayName(node))
self.tooltip=py3.as_str(lib.GenApiNodeGetToolTip(node))
self.description=py3.as_str(lib.GenApiNodeGetDescription(node))
self.visibility=self._vis_types[lib.GenApiNodeGetVisibility(node)]
self.access=self._acc_types[lib.GenApiNodeGetAccessMode(node)]
self.implemented=bool(lib.GenApiNodeIsImplemented(node))
self.available=bool(lib.GenApiNodeIsAvailable(node))
self.readable=bool(lib.GenApiNodeIsReadable(node))
self.writable=bool(lib.GenApiNodeIsWritable(node))
self.units=None
self.min=None
self.max=None
self.inc=None
self.repr=None
self._value_nodes=None
self.values=[]
self.ivalues=[]
self.labels={}
self.ilabels={}
self._fill_info()
def _fill_info(self):
if self.kind=="int":
self.repr=self._repr_type[lib.GenApiIntegerGetRepresentation(self.node)]
elif self.kind=="float":
self.repr=self._repr_type[lib.GenApiFloatGetRepresentation(self.node)]
self.units=py3.as_str(lib.GenApiFloatGetUnit(self.node))
elif self.kind=="enum":
nenum=lib.GenApiEnumerationGetNumEntries(self.node)
self._value_nodes=[lib.GenApiEnumerationGetEntryByIndex(self.node,i) for i in range(nenum)]
self.update_limits()
[docs]
def update_limits(self):
"""Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``"""
if self.kind=="int":
self.min=lib.GenApiIntegerGetMin(self.node)
self.max=lib.GenApiIntegerGetMax(self.node)
self.inc=lib.GenApiIntegerGetInc(self.node)
return (self.min,self.max,self.inc)
elif self.kind=="float":
self.min=lib.GenApiFloatGetMin(self.node)
self.max=lib.GenApiFloatGetMax(self.node)
return (self.min,self.max,self.inc)
elif self.kind=="enum":
self.values=[py3.as_str(lib.GenApiEnumerationEntryGetSymbolic(n)) for n in self._value_nodes]
self.ivalues=[lib.GenApiEnumerationEntryGetValue(n) for n in self._value_nodes]
self.labels=dict(zip(self.values,self.ivalues))
self.ilabels=dict(zip(self.ivalues,self.values))
[docs]
def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_limits()
if self.kind in ["int","float"]:
if value<self.min:
value=self.min
elif value>self.max:
value=self.max
else:
if self.inc and self.inc>0:
value=((value-self.min)//self.inc)*self.inc+self.min
return value
[docs]
def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if self.kind=="command":
return bool(lib.GenApiCommandIsDone(self.node))
if not self.readable:
raise BaslerError("attribute {} is not readable".format(self.name))
if self.kind=="int":
return lib.GenApiIntegerGetValue(self.node)
if self.kind=="float":
return lib.GenApiFloatGetValue(self.node)
if self.kind=="bool":
return lib.GenApiBooleanGetValue(self.node)
if self.kind=="str":
return py3.as_str(lib.GenApiNodeToString(self.node))
if self.kind=="enum":
value=py3.as_str(lib.GenApiNodeToString(self.node))
if not enum_as_str:
value=self.labels[value]
return value
if self.kind=="unknown":
return None
raise BaslerError("attribute {} of kind {} can not be read".format(self.name,self.kind))
[docs]
def set_value(self, value, truncate=True):
"""
Set attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
if not self.writable:
raise BaslerError("attribute {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
if self.kind=="int":
lib.GenApiIntegerSetValue(self.node,int(value))
elif self.kind=="float":
lib.GenApiFloatSetValue(self.node,float(value))
elif self.kind=="bool":
lib.GenApiBooleanSetValue(self.node,bool(value))
elif self.kind=="str":
lib.GenApiNodeFromString(self.node,str(value))
elif self.kind=="enum":
value=self.ilabels.get(value,value)
lib.GenApiNodeFromString(self.node,str(value))
elif self.kind!="unknown":
raise BaslerError("attribute {} of kind {} can not be set".format(self.name,self.kind))
[docs]
def call_command(self):
"""Execute the given command"""
if self.kind=="command":
if not self.implemented:
raise BaslerError("command is not implemented: {}".format(self.name))
lib.GenApiCommandExecute(self.node)
else:
raise BaslerError("attribute {} is not a command".format(self.name))
def __repr__(self):
return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",TCameraInfo._fields)
[docs]
class BaslerPylonCamera(camera.IROICamera, camera.IAttributeCamera, camera.IExposureCamera):
"""
Generic Basler pylon camera interface.
Args:
idx: camera index among the cameras listed using :func:`list_cameras`
name: camera name; if specified, then `idx` is ignored and the camera is determined based on the provided name
"""
Error=BaslerError
TimeoutError=BaslerTimeoutError
def __init__(self, idx=0, name=None):
super().__init__()
lib.initlib()
self.idx=idx
self.name=name
self.hdev=None
self.strm=None
self._buffer_mgr=None
self._looper=self.ScheduleLooper()
self.open()
self._raw_readout_format=False
self._add_info_variable("device_info",self.get_device_info)
self._add_settings_variable("exposure",self.get_exposure,self.set_exposure,ignore_error=BaslerError)
self._update_device_variable_order("exposure")
self._add_settings_variable("frame_period",self.get_frame_period,self.set_frame_period,ignore_error=BaslerError)
def _get_connection_parameters(self):
return (self.idx,self.name)
[docs]
def open(self):
"""Open connection to the camera"""
if self.hdev is not None:
return
with libctl.temp_open():
cams=list_cameras()
if self.name is not None:
idx=_find_camera(cams,name=self.name)
if idx is None:
names=", ".join(["'{}'".format(c.name) for c in cams])
raise BaslerError("could not find camera with name {}; available cameras are {}".format(self.name,names))
else:
idx=self.idx
if idx>=len(cams):
raise BaslerError("camera index {} is not available ({} cameras exist)".format(idx,len(cams)))
with self._close_on_error():
self.hdev=lib.PylonCreateDeviceByIndex(idx)
lib.PylonDeviceOpen(self.hdev,PYLONC_ACCESS.PYLONC_ACCESS_MODE_MONITOR|PYLONC_ACCESS.PYLONC_ACCESS_MODE_CONTROL|PYLONC_ACCESS.PYLONC_ACCESS_MODE_STREAM)
self.strm=lib.PylonDeviceGetStreamGrabber(self.hdev,0)
self._opid=libctl.open().opid
self._update_attributes()
self.post_open()
[docs]
def close(self):
"""Close connection to the camera"""
if self.hdev is not None:
try:
self.clear_acquisition()
finally:
try:
lib.PylonDeviceClose(self.hdev)
lib.PylonDestroyDevice(self.hdev)
finally:
self.hdev=None
libctl.close(self._opid)
self._opid=None
[docs]
def is_opened(self):
return (self.hdev is not None) and bool(lib.PylonDeviceIsOpen(self.hdev))
[docs]
def post_open(self):
"""Additional setup after camera opening"""
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize","AcquisitionStart","AcquisitionStop"]
def _list_attributes(self):
nmap=lib.PylonDeviceGetNodeMap(self.hdev)
root=lib.GenApiNodeMapGetNode(nmap,"Root")
nodes=lib.collect_nodes(root,add_branch=False)
nodes={(n[5:] if n.startswith("Root/") else n):v for n,v in nodes.items()}
attrs=[BaslerPylonAttribute(node,full_name=n) for n,node in nodes.items()]
for n in self._builtin_attrs:
node=lib.GenApiNodeMapGetNode(nmap,n)
if node:
attrs.append(BaslerPylonAttribute(node,full_name=n))
return attrs
[docs]
def get_attribute_value(self, name, error_on_missing=True, default=None, enum_as_str=True): # pylint: disable=arguments-differ
"""
Get value of an attribute with the given name.
If the value doesn't exist or can not be read and ``error_on_missing==True``, raise error; otherwise, return `default`.
If `default` is not ``None``, assume that ``error_on_missing==False``.
If `name` points at a dictionary branch, return a dictionary with all values in this branch.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
return super().get_attribute_value(name,error_on_missing=error_on_missing,default=default,enum_as_str=enum_as_str)
[docs]
def set_attribute_value(self, name, value, truncate=True, error_on_missing=True): # pylint: disable=arguments-differ
"""
Set value of an attribute with the given name.
If the value doesn't exist or can not be written and ``error_on_missing==True``, raise error; otherwise, do nothing.
If `name` points at a dictionary branch, set all values in this branch (in this case `value` must be a dictionary).
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_attribute_value(name,value,truncate=truncate,error_on_missing=error_on_missing)
[docs]
def call_command(self, name):
"""Execute the given command"""
self.ca[name].call_command()
[docs]
def get_all_attribute_values(self, root="", enum_as_str=True, ignore_errors=True): # pylint: disable=arguments-differ
"""Get values of all attributes with the given `root`"""
values=dictionary.Dictionary()
for n,att in self.get_attribute(root).as_dict("flat").items():
if att.readable:
try:
values[n]=att.get_value(enum_as_str=enum_as_str)
except BaslerError: # sometimes nominally implemented features still raise errors
if not ignore_errors:
raise
return values
[docs]
def set_all_attribute_values(self, settings, root="", truncate=True): # pylint: disable=arguments-differ
"""
Set values of all attributes with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
return super().set_all_attribute_values(settings,root=root,truncate=truncate)
[docs]
def get_device_info(self):
"""
Get camera information.
Return tuple ``(name, model, serial, devclass, devversion, vendor, friendly_name, user_name, props)``.
"""
info=lib.PylonDeviceGetDeviceInfo(self.hdev)
info_handle=lib.PylonDeviceGetDeviceInfoHandle(self.hdev)
return TCameraInfo(*_parse_device_info(info,info_handle))
def _get_data_dimensions_rc(self):
return self.cav["Height"],self.cav["Width"]
[docs]
def get_detector_size(self):
return self.ca["Width"].max,self.ca["Height"].max
[docs]
def get_roi(self):
ox=self.get_attribute_value("OffsetX",default=0)
oy=self.get_attribute_value("OffsetY",default=0)
w=self.cav["Width"]
h=self.cav["Height"]
return ox,ox+w,oy,oy+h
[docs]
@camera.acqcleared
def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
for a in ["Width","Height","OffsetX","OffsetY"]:
if a not in self.ca or not self.ca[a].writable:
return self.get_roi()
det_size=self.get_detector_size()
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
with self.pausing_acquisition():
self.cav["Width"]=self.ca["Width"].min
self.cav["Height"]=self.ca["Height"].min
self.cav["OffsetX"]=hstart
self.cav["OffsetY"]=vstart
self.cav["Width"]=max(self.cav["Width"],hend-hstart)
self.cav["Height"]=max(self.cav["Height"],vend-vstart)
return self.get_roi()
[docs]
def get_roi_limits(self, hbin=1, vbin=1):
params=["Width","Height","OffsetX","OffsetY"]
minp=tuple([(self.ca[p].min if p in self.ca else 0) for p in params])
maxp=tuple([(self.ca[p].max if p in self.ca else 0) for p in params])
incp=tuple([(self.ca[p].inc if p in self.ca else 0) for p in params])
hlim=camera.TAxisROILimit(minp[0] or maxp[0],maxp[0],incp[2] or maxp[0],incp[0] or maxp[0],1)
vlim=camera.TAxisROILimit(minp[1] or maxp[1],maxp[1],incp[3] or maxp[1],incp[1] or maxp[1],1)
return hlim,vlim
[docs]
def get_exposure(self):
exp=self.get_attribute_value("ExposureTimeAbs",error_on_missing=False)
if exp is not None:
return exp/1E6 # in us by default
bexp=self.get_attribute_value("ExposureTimeBaseAbs",error_on_missing=False)
rexp=self.get_attribute_value("ExposureTimeRaw",error_on_missing=False)
if bexp is not None and rexp is not None:
return bexp*rexp/1E6
raise BaslerError("camera does not support exposure")
[docs]
def set_exposure(self, exposure):
if "ExposureTimeAbs" in self.attributes:
self.cav["ExposureTimeAbs"]=exposure*1E6
elif "ExposureTimeBaseAbs" in self.attributes and "ExposureTimeRaw" in self.attributes:
self.cav["ExposureTimeRaw"]=(exposure/self.cav["ExposureTimeBaseAbs"])*1E6
else:
raise BaslerError("camera does not support exposure")
return self.get_exposure()
[docs]
def get_frame_period(self):
fps_ena=self.get_attribute_value("AcquisitionFrameRateEnable",error_on_missing=False)
fps=self.get_attribute_value("AcquisitionFrameRateAbs",error_on_missing=False)
if fps is not None:
period=1./fps if (fps_ena or fps_ena is None) else 0
try:
exposure=self.get_exposure()
return max(exposure,period)
except BaslerError:
return period
try:
return self.get_exposure()
except BaslerError:
raise BaslerError("camera does not support frame period")
[docs]
def set_frame_period(self, frame_period):
"""Set frame period (time between two consecutive frames in the internal trigger mode)"""
if "AcquisitionFrameRateAbs" in self.attributes:
self.cav["AcquisitionFrameRateAbs"]=1./frame_period if frame_period>0 else self.attributes["AcquisitionFrameRateAbs"].max
if "AcquisitionFrameRateEnable" in self.attributes:
self.cav["AcquisitionFrameRateEnable"]=frame_period>0
else:
raise BaslerError("camera does not support frame period")
return self.get_frame_period()
[docs]
def get_frame_timings(self):
return self._TAcqTimings(self.get_exposure(),self.get_frame_period())
[docs]
class BufferManager:
"""Buffer manager, which deals with buffer memory allocation, registering and deregistering, and retrieving the result and the leftovers"""
def __init__(self, strm, size, nbuff):
self.strm=strm
self.size=size
self.nbuff=nbuff
self._full_buffer=ctypes.create_string_buffer(self.size*nbuff)
buff_ptr=ctypes.addressof(self._full_buffer)
self._buffers=[buff_ptr+self.size*i for i in range(nbuff)]
self._handles=None
self._ret_rdy=ctypes.c_ubyte()
self._ret_val=pylonC_lib.PylonGrabResult_t()
[docs]
def register(self):
"""Register buffers"""
self.deregister()
self._handles=[lib.PylonStreamGrabberRegisterBuffer(self.strm,b,self.size) for b in self._buffers]
[docs]
def deregister(self):
"""Deregister buffers"""
if self._handles:
for h in self._handles:
lib.PylonStreamGrabberDeregisterBuffer(self.strm,h)
self._handles=None
[docs]
def get_buffer(self, fidx):
"""Get buffer corresponding to the given frame index"""
return self._buffers[fidx%self.nbuff]
[docs]
def get_handle(self, fidx):
"""Get buffer handle corresponding to the given frame index"""
return self._handles[fidx%self.nbuff]
[docs]
def get_all_handles(self):
"""Get all buffer handles as a ctypes array"""
return as_ctypes_array(self._handles,ctypes.c_void_p)
[docs]
def queue(self, fidx=None):
"""Queue a buffer with the given index or all buffers"""
if fidx is None:
for h in self._handles:
lib.PylonStreamGrabberQueueBuffer(self.strm,h,None)
else:
lib.PylonStreamGrabberQueueBuffer(self.strm,self._handles[fidx%self.nbuff],None)
[docs]
def retrieve(self):
"""Retrieve the next buffer and return its info and whether it is ready"""
return lib.PylonStreamGrabberRetrieveResult(self.strm)
[docs]
def flush(self):
"""Retrieve all leftover buffers"""
while True:
_,rdy=self.retrieve()
if not rdy:
break
[docs]
class ScheduleLooper:
"""
Cython-based schedule loop manager.
Runs the loop function and provides callback storage.
"""
def __init__(self):
self.evt=threading.Event()
self._thread=None
self.looping=ctypes.c_ulong(0)
self.nread=ctypes.c_ulong(0)
self._buff_mgr=None
[docs]
def start_loop(self, buff_mgr):
"""Start loop serving the given buffer manager"""
self.stop_loop()
self.evt.clear()
self.looping.value=1
self.nread.value=0
self._buff_mgr=buff_mgr
self._thread=threading.Thread(target=self._loop,daemon=True)
self._thread.start()
self.evt.wait()
[docs]
def stop_loop(self):
"""Stop the loop thread"""
if self._thread is not None:
self.looping.value=0
self._thread.join()
self._thread=None
self._buff_mgr=None
def _loop(self):
self.evt.set()
hbuffers=self._buff_mgr.get_all_handles()
nbuff=len(hbuffers)
strm=self._buff_mgr.strm
wtobj=lib.PylonStreamGrabberGetWaitObject(strm)
looper(strm.value,wtobj.value,nbuff,ctypes.addressof(hbuffers),
ctypes.addressof(self.looping),ctypes.addressof(self.nread),
funcaddressof(lib.lib.PylonStreamGrabberRetrieveResult),funcaddressof(lib.lib.PylonStreamGrabberQueueBuffer),funcaddressof(lib.lib.PylonWaitObjectWait))
[docs]
def is_looping(self):
"""Check if the loop is running"""
return self.looping.value
[docs]
def get_status(self):
"""Get the current loop status, which is the tuple ``(acquired,)``"""
return (self.nread.value,)
def _allocate_buffers(self, nbuff):
self._deallocate_buffers()
size=lib.PylonStreamGrabberGetPayloadSize(self.hdev,self.strm)
nbuff=min(nbuff,2**30//size)
self._buffer_mgr=self.BufferManager(self.strm,size,nbuff)
lib.PylonStreamGrabberSetMaxBufferSize(self.strm,size)
lib.PylonStreamGrabberSetMaxNumBuffer(self.strm,nbuff)
return nbuff
def _deallocate_buffers(self):
if self._buffer_mgr is not None:
self._buffer_mgr.deregister()
self._buffer_mgr=None
[docs]
@interface.use_parameters(mode="acq_mode")
def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ
"""
Setup acquisition mode.
`mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition).
`nframes` sets up number of frame buffers.
"""
self.clear_acquisition()
self.set_attribute_value("AcquisitionMode","Continuous",error_on_missing=False)
lib.PylonStreamGrabberOpen(self.strm)
nframes=self._allocate_buffers(nbuff=nframes)
lib.PylonStreamGrabberPrepareGrab(self.strm)
self._buffer_mgr.register()
super().setup_acquisition(mode=mode,nframes=nframes)
[docs]
def clear_acquisition(self):
self.stop_acquisition()
if self._buffer_mgr is not None:
self._deallocate_buffers()
lib.PylonStreamGrabberFinishGrab(self.strm)
lib.PylonStreamGrabberClose(self.strm)
super().clear_acquisition()
[docs]
def start_acquisition(self, *args, **kwargs):
self.stop_acquisition()
super().start_acquisition(*args,**kwargs)
self._frame_counter.reset(self._acq_params["nframes"])
self._buffer_mgr.queue()
lib.PylonStreamGrabberStartStreamingIfMandatory(self.strm)
self._looper.start_loop(self._buffer_mgr)
self.call_command("AcquisitionStart")
[docs]
def stop_acquisition(self):
if self.acquisition_in_progress():
self.call_command("AcquisitionStop")
self._looper.stop_loop()
self._frame_counter.update_acquired_frames(self._get_acquired_frames())
lib.PylonStreamGrabberStopStreamingIfMandatory(self.strm)
lib.PylonStreamGrabberFlushBuffersToOutput(self.strm)
self._buffer_mgr.flush()
super().stop_acquisition()
[docs]
def acquisition_in_progress(self):
return self._looper.is_looping()
def _get_acquired_frames(self):
return self._looper.get_status()[0]
[docs]
def enable_raw_readout(self, enable="rows"):
"""
Enable raw frame transfer.
Should be used if the camera uses unsupported pixel format.
Can be ``"frame"`` (return the whole frame as a 1D ``"u1"`` numpy array),
``"rows"`` (return a 2D array, where each row corresponds to a single image row),
or ``False`` (convert to image data, or raise an error if the format is not supported; default)
"""
funcargparse.check_parameter_range(enable,"enable",{False,"rows","frame"})
self._raw_readout_format=enable
def _parse_buffer(self, buff, size, shape, pixel_format, n=1):
data=np.ctypeslib.as_array(ctypes.cast(buff,ctypes.POINTER(ctypes.c_ubyte)),shape=(n,size))
if self._raw_readout_format=="frame":
return data[:,None,:]
if self._raw_readout_format=="rows":
return data.reshape((n,shape[0],-1))
supported_formats=["Mono8","Mono10","Mono12","Mono16","Mono32"]
if pixel_format not in supported_formats:
sf_string=", ".join(supported_formats)
raise BaslerError("pixel format {} is not supported, only [{}] are supported; raw data readout can be enabled via enable_raw_readout method".format(pixel_format,sf_string))
if pixel_format=="Mono8":
return data.reshape((n,)+shape)
elif pixel_format in ["Mono10","Mono12","Mono16"]:
return data.view("<u2").reshape((n,)+shape)
else:
return data.view("<u4").reshape((n,)+shape)
_support_chunks=True
def _read_frames(self, rng, return_info=False):
size=self._buffer_mgr.size
shape=self.cav["Height"],self.cav["Width"]
pixel_format=self.cav["PixelFormat"]
nbuff=self._buffer_mgr.nbuff
i0,i1=rng
if (i1-1)//nbuff==i0//nbuff:
chunks=[(i0,i1-i0)]
else:
cut=(i1//nbuff)*nbuff
chunks=[(i0,cut-i0),(cut,i1-cut)]
frames=[self._parse_buffer(self._buffer_mgr.get_buffer(b),size,shape,pixel_format,n=n) for b,n in chunks]
if not self._raw_readout_format:
frames=[self._convert_indexing(f,"rct",axes=(-2,-1)) for f in frames]
return frames,None