from . import fgrab_prototyp_lib, fgrab_define_defs
from .fgrab_prototyp_defs import FgAppletIntProperty, FgAppletStringProperty, FgAppletIteratorSource, FgParamTypes, FgProperty, Fg_Info_Selector, drFg_Info_Selector, Fg_Apc_Flag
from .fgrab_prototyp_defs import MeCameraLinkFormat
from .fgrab_define_defs import FG_STATUS, FG_ACQ, FG_IMGFMT, FG_PARAM
from .fgrab_prototyp_lib import wlib as lib, SiliconSoftwareError, SIFgrabLibError
from ...core.utils import py3, dictionary, general
from ...core.utils.ctypes_tools import funcaddressof
from ...core.devio import interface
from ..interface import camera
from ...core.utils.cext_tools import try_import_cext
with try_import_cext():
from .utils import looper, get_callback # pylint: disable=no-name-in-module
import numpy as np
import collections
import ctypes
import struct
import re
import warnings
import threading
class SiliconSoftwareTimeoutError(SiliconSoftwareError):
"Silicon Software frame timeout error"
# TODO: serial interface
# TODO: trigger
TBoardInfo=collections.namedtuple("TBoardInfo",["name","full_name","serial"])
TFullBoardInfo=collections.namedtuple("TBoardInfo",["name","full_name","serial","system_info"])
[docs]
def get_board_info(board, full_desc=False):
"""Get board info for a given index (starting from 0)"""
bt=lib.Fg_getBoardType(board)
system_info={}
aids=Fg_Info_Selector if full_desc else [Fg_Info_Selector.INFO_BOARDSERIALNO]
for aid in aids:
try:
attr=FGrabAttribute(None,aid.value,system=True,idx=board)
system_info[attr.name]=attr.get_value()
except SiliconSoftwareError:
pass
if full_desc:
return TFullBoardInfo(*[py3.as_str(lib.Fg_getBoardNameByType(bt,short)) for short in [1,0]],serial=system_info.get("INFO_BOARDSERIALNO","").upper(),system_info=system_info)
return TBoardInfo(*[py3.as_str(lib.Fg_getBoardNameByType(bt,short)) for short in [1,0]],serial=system_info.get("INFO_BOARDSERIALNO","").upper())
[docs]
def list_boards(full_desc=False):
"""List all boards available through Silicon Software interface"""
lib.initlib()
boards=[]
i=0
try:
while True:
boards.append(get_board_info(i,full_desc=full_desc))
i+=1
except SIFgrabLibError as err:
if err.code!=FG_STATUS.FG_INVALID_BOARD_NUMBER:
raise
return boards
[docs]
def get_boards_number():
"""List number of connected Silicon Software boards"""
return len(list_boards())
TAppletInfo=collections.namedtuple("TAppletInfo",["name","file"])
_applet_string_props={ "uid":FgAppletStringProperty.FG_AP_STRING_APPLET_UID,
"name":FgAppletStringProperty.FG_AP_STRING_APPLET_NAME,
"desc":FgAppletStringProperty.FG_AP_STRING_DESCRIPTION,
"category":FgAppletStringProperty.FG_AP_STRING_CATEGORY,
"platform":FgAppletStringProperty.FG_AP_STRING_SUPPORTED_PLATFORMS,
"tags":FgAppletStringProperty.FG_AP_STRING_TAGS,
"version":FgAppletStringProperty.FG_AP_STRING_VERSION,
"path":FgAppletStringProperty.FG_AP_STRING_APPLET_PATH,
"file":FgAppletStringProperty.FG_AP_STRING_APPLET_FILE,}
_applet_int_props={ "flags":FgAppletIntProperty.FG_AP_INT_FLAGS,
"info":FgAppletIntProperty.FG_AP_INT_INFO,}
TFullAppletInfo=collections.namedtuple("TFullAppletInfo",["name","uid","desc","category","platform","tags","version","path","file","flags","info"])
def _get_applet_item_info(item, full_desc=False):
props=TFullAppletInfo._fields if full_desc else TAppletInfo._fields
values=[]
for p in props:
if p in _applet_string_props:
values.append(py3.as_str(lib.Fg_getAppletStringProperty(item,_applet_string_props[p])))
else:
values.append(lib.Fg_getAppletIntProperty(item,_applet_int_props[p]))
return TFullAppletInfo(*values) if full_desc else TAppletInfo(*values)
[docs]
def list_applets(board, full_desc=False, valid=True, on_board=False):
"""
List all applets available for this board.
`board` is the board index (starting from 0) given by its position in the list returned by :func:`list_boards`.
If ``full_desc==True``, return full description for each applet; otherwise, return only name and file name.
If ``valid==True``, list only valid and compatible applets; otherwise, list all applets.
If ``on_board==True``, list applets running on board; otherwise, list all applets contained in the system.
"""
lib.initlib()
# 0x157 is all valid and compatible flags: FG_AF_IS_AVAILABLE, FG_AF_IS_CORRECT_PLATFORM, FG_AF_IS_VALID_LICENSE,
# FG_AF_IS_LOADABLE, FG_AF_IS_SUPPORTED_BY_RUNTIME
flag=0x117 if valid else 0
src=FgAppletIteratorSource.FG_AIS_BOARD if on_board else FgAppletIteratorSource.FG_AIS_FILESYSTEM
appn,appiter=lib.Fg_getAppletIterator(board,src,flag)
infos=[]
try:
for i in range(appn):
item=lib.Fg_getAppletIteratorItem(appiter,i)
infos.append(_get_applet_item_info(item,full_desc=full_desc))
finally:
lib.Fg_freeAppletIterator(appiter)
return infos
[docs]
def get_applet_info(board, **kwargs):
"""Return full information for an applet with the given parameters (e.g., name, or full path)"""
applets=list_applets(board,full_desc=True)
for app in applets:
if all([getattr(app,k)==v for k,v in kwargs.items()]):
return app
raise KeyError("could not fine applet with the parameters {}".format(kwargs))
[docs]
class FGrabAttribute:
"""
Object representing an Silicon Software frame grabber parameter.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`` instance, but could be created manually.
Args:
fg: opened frame grabber handle
aid: attribute ID
port: camera port within the frame grabber
system: if ``True``, this is a system attribute; otherwise, it is a camera attribute
idx: if ``system==True`` and `fg` is ``None``, it can specify a board index for a not yet opened grabber
Attributes:
name: attribute name
kind: attribute kind; can be ``"i32"``, ``"i64"``, ``"u32"``, ``"u64"``, ``"f64"``, or ``"str"``
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
ivalues: list of possible integer values for enum attributes
values: list of possible text values for enum attributes
labels: dict ``{label: index}`` which shows all possible values of an enumerated attribute and their corresponding numerical values
ilabels: dict ``{index: label}`` which shows labels corresponding to numerical values of an enumerated attribute
"""
_attr_types={ FgParamTypes.FG_PARAM_TYPE_INT32_T: "i32",
FgParamTypes.FG_PARAM_TYPE_UINT32_T: "u32",
FgParamTypes.FG_PARAM_TYPE_INT64_T: "i64",
FgParamTypes.FG_PARAM_TYPE_UINT64_T: "u64",
FgParamTypes.FG_PARAM_TYPE_DOUBLE: "f64",
FgParamTypes.FG_PARAM_TYPE_CHAR_PTR: "str",
FgParamTypes.FG_PARAM_TYPE_SIZE_T: "size",
FgParamTypes.FG_PARAM_TYPE_STRUCT_FIELDPARAMACCESS: "fp_acc",
FgParamTypes.FG_PARAM_TYPE_STRUCT_FIELDPARAMINT: "fp_i32",
FgParamTypes.FG_PARAM_TYPE_STRUCT_FIELDPARAMINT64: "fp_i64",
FgParamTypes.FG_PARAM_TYPE_STRUCT_FIELDPARAMDOUBLE: "fp_f64", }
def __init__(self, fg, aid, port=0, system=False, idx=None):
self.fg=fg
self.idx=idx
self.siso_port=port
self.aid=aid
self.system=system
if system:
self.name=drFg_Info_Selector.get(aid,None)
else:
self.name=py3.as_str(lib.Fg_getParameterNameById(fg,aid,self.siso_port))
self._attr_type_n=int(self._get_property(FgProperty.PROP_ID_DATATYPE))
self.kind=self._attr_types[self._attr_type_n]
self.min=None
self.max=None
self.inc=None
self.values=[]
self.ivalues=[]
self.labels={}
self.ilabels={}
if not self.system:
self.ilabels=self._get_enum_values()
self.ivalues=list(self.ilabels)
self.values=[self.ilabels[i] for i in self.ivalues]
self.labels=dict(zip(self.values,self.ivalues))
self.update_limits()
def _get_property(self, prop):
if self.system:
if self.fg is not None:
return lib.Fg_getSystemInformation(self.fg,self.aid,prop,0)
return lib.Fg_getSystemInformation(None,self.aid,prop,self.idx)
return lib.Fg_getParameterPropertyEx(self.fg,self.aid,prop,self.siso_port)
[docs]
def update_limits(self):
"""Update minimal and maximal attribute limits and return tuple ``(min, max, inc)``"""
if self.kind in ["i32","u32","i64","u64"]:
self.min=int(self._get_property(FgProperty.PROP_ID_MIN))
self.max=int(self._get_property(FgProperty.PROP_ID_MAX))
self.inc=int(self._get_property(FgProperty.PROP_ID_STEP))
if self.kind=="f64":
self.min=float(self._get_property(FgProperty.PROP_ID_MIN))
self.max=float(self._get_property(FgProperty.PROP_ID_MAX))
self.inc=float(self._get_property(FgProperty.PROP_ID_STEP))
return (self.min,self.max,self.inc)
def _get_enum_values(self):
try:
self._get_property(FgProperty.PROP_ID_IS_ENUM)
except fgrab_prototyp_lib.SIFgrabLibError as err:
if err.code!=FG_STATUS.FG_INVALID_TYPE:
raise
return {}
vstr=self._get_property(FgProperty.PROP_ID_ENUM_VALUES)
values={}
while vstr:
ival,=struct.unpack("<I",vstr[:4])
if ival==0xFFFFFFFF:
break
vstr=vstr[4:]
p=vstr.find(b"\x00")
sval=py3.as_str(vstr[:p])
values[ival]=sval
vstr=vstr[p+1:]
return values
[docs]
def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_limits()
if self.kind in ["i32","u32","i64","u64","f64"] and not self.values:
if value<self.min:
value=self.min
elif value>self.max:
value=self.max
else:
inc=self.inc
if inc>0:
value=((value-self.min)//inc)*inc+self.min
return value
[docs]
def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if self.system:
val=self._get_property(FgProperty.PROP_ID_VALUE)
if self.kind in ["i32","i64","u32","u64"]:
if val.startswith(b"0x"):
val=int(val[2:],base=16)
val=int(val)
elif self.kind=="f64":
val=float(val)
else:
val=lib.Fg_getParameterWithType_auto(self.fg,self.aid,self.siso_port,ptype=self._attr_type_n)
if self.kind=="str":
val=py3.as_str(val)
if enum_as_str and self.ilabels:
val=self.ilabels.get(val,val)
return val
[docs]
def set_value(self, value, truncate=True):
"""
Get attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
if self.system:
raise ValueError("system property {} can not be set".format(self.name))
if truncate:
value=self.truncate_value(value)
if self.labels:
value=self.labels.get(value,value)
lib.Fg_setParameterWithType_auto(self.fg,self.aid,value,self.siso_port,ptype=self._attr_type_n)
def __repr__(self):
return "{}(name='{}', kind='{}')".format(self.__class__.__name__,self.name,self.kind)
TDeviceInfo=collections.namedtuple("TDeviceInfo",["applet_info","system_info","software_version"])
TFrameInfo=collections.namedtuple("TFrameInfo",["frame_index","framestamp","timestamp","timestamp_long"])
[docs]
class SiliconSoftwareFrameGrabber(camera.IGrabberAttributeCamera,camera.IROICamera):
"""
Generic Silicon Software frame grabber interface.
Compared to :class:`SiliconSoftwareCamera`, has more permissive initialization arguments,
which simplifies its use as a base class for expanded cameras.
Args:
siso_board: board index, starting from 0; available boards can be learned by :func:`list_boards`
siso_applet: applet name, which can be learned by :func:`list_applets`;
usually, a simple applet like ``"DualLineGray16"`` or ``"MediumLineGray16`` are most appropriate;
can be either an applet name, or a direct path to the applet DLL
siso_port: port number, if several ports are supported by the grabber and the applet
siso_detector_size: if not ``None``, can specify the maximal detector size;
by default, use the maximal available for the frame grabber (usually, 16384x16384)
"""
Error=SiliconSoftwareError
TimeoutError=SiliconSoftwareTimeoutError
_TFrameInfo=TFrameInfo
_adjustable_frameinfo_period=True
def __init__(self, siso_board=0, siso_applet=None, siso_port=0, siso_detector_size=None, do_open=True, **kwargs):
super().__init__(**kwargs)
lib.initlib()
self.siso_board=siso_board
self.siso_applet=siso_applet
self.siso_applet_path=None
self.siso_port=siso_port
self.fg=None
self._buffer_mgr=None
self._last_nacq_t=0
self._frame_merge=1
self._acq_in_progress=False
self._system_info=None
self.siso_detector_size=siso_detector_size
self._camlink_camtype_attr=None
self._add_info_variable("device_info",self.get_device_info)
self._add_info_variable("camlink_pixel_formats",self.get_available_camlink_pixel_formats)
self._add_status_variable("pixel_format",self.get_camlink_pixel_format,ignore_error=SiliconSoftwareError)
if do_open:
self.open()
def _get_applets(self, applet=None):
if applet is None:
return list_applets(self.siso_board,full_desc=True,on_board=True)
applets=list_applets(self.siso_board,full_desc=True)
return [app for app in applets if re.fullmatch(applet,app.name)]
def _try_connect_applets(self, paths):
for p in paths:
if p is None:
continue
try:
self.fg=lib.Fg_Init(p,self.siso_board)
return p
except SiliconSoftwareError:
pass
def _normalize_grabber_attribute_name(self, name):
if name.startswith("FG_"):
return name[3:]
return name
_fixed_parameters=[FG_PARAM.FG_CAMSTATUS,FG_PARAM.FG_IMAGE_TAG,FG_PARAM.FG_TIMEOUT,FG_PARAM.FG_TIMESTAMP,FG_PARAM.FG_TIMESTAMP_LONG,FG_PARAM.FG_TRANSFER_LEN]
def _list_grabber_attributes(self):
pnum=lib.Fg_getNrOfParameter(self.fg)
attrs=[FGrabAttribute(self.fg,lib.Fg_getParameterId(self.fg,i),port=self.siso_port) for i in range(pnum)]
attrs+=[FGrabAttribute(self.fg,aid,port=self.siso_port) for aid in self._fixed_parameters]
return [a for a in attrs if a.kind in ["i32","u32","i64","u64","f64","str"]]
def _get_connection_parameters(self):
return self.siso_board,self.siso_applet,self.siso_port,self.siso_applet_path
[docs]
def open(self):
"""Open connection to the camera"""
super().open()
if self.fg is None:
siso_applets=list(self.siso_applet) if isinstance(self.siso_applet,(list,tuple)) else [self.siso_applet]
p=self._try_connect_applets(siso_applets)
if p is None:
siso_applets=[a.path for p in siso_applets for a in self._get_applets(p)]
p=self._try_connect_applets(siso_applets)
if self.fg is None:
raise SiliconSoftwareError("can not find applets {}".format(self.siso_applet))
try:
self.siso_applet_path=get_applet_info(self.siso_board,name=p).path
except KeyError:
self.siso_applet_path=p
self._update_grabber_attributes()
self._camlink_camtype_attr="CAMERA_LINK_CAMTYP" if "CAMERA_LINK_CAMTYP" in self.grabber_attributes else "CAMERA_LINK_CAMTYPE"
self._buffer_mgr=self.BufferManager(self.fg,self.siso_port)
[docs]
def close(self):
"""Close connection to the camera"""
if self.fg is not None:
self.clear_acquisition()
self._buffer_mgr=None
try:
self.fg=lib.Fg_FreeGrabber(self.fg)
finally:
self.fg=None
super().close()
[docs]
def is_opened(self):
"""Check if the device is connected"""
return self.fg is not None
[docs]
def get_all_grabber_attribute_values(self, root="", **kwargs):
grabber_attributes=self.get_grabber_attribute(root)
values=dictionary.Dictionary()
for n,a in grabber_attributes.items():
try:
values[n]=a.get_value(**kwargs)
except SiliconSoftwareError:
pass
return values
[docs]
def set_all_grabber_attribute_values(self, settings, root="", **kwargs):
grabber_attributes=self.get_grabber_attribute(root)
settings=dictionary.as_dict(settings,style="flat",copy=False)
for k,v in settings.items():
k=self._normalize_grabber_attribute_name(k)
if k in grabber_attributes:
try:
grabber_attributes[k].set_value(v,**kwargs)
except SiliconSoftwareError:
pass
[docs]
def get_system_info(self):
"""Get the dictionary with all system information parameters"""
if self._system_info is None:
self._system_info={}
for aid in Fg_Info_Selector:
try:
attr=FGrabAttribute(self.fg,aid.value,system=True)
self._system_info[attr.name]=attr.get_value()
except SiliconSoftwareError:
pass
return self._system_info
[docs]
def get_genicam_info_xml(self):
"""Get description in Genicam-compatible XML format"""
return py3.as_str(lib.Fg_getParameterInfoXML(self.fg,self.siso_port))
[docs]
def get_device_info(self):
"""
Get camera model data.
Return tuple ``(applet_info, system_info, software_version)`` with the board serial number and an the interface type (e.g., ``"1430"`` for NI PCIe-1430)
"""
system_info=self.get_system_info()
applet_info=get_applet_info(self.siso_board,path=self.siso_applet_path)
software_version=py3.as_str(lib.Fg_getSWVersion())
return TDeviceInfo(applet_info,system_info,software_version)
[docs]
def set_frame_merge(self, frame_merge=1):
if self._frame_merge!=frame_merge:
roi=self.get_grabber_roi()
self.clear_acquisition()
self._frame_merge=frame_merge
self.set_grabber_roi(*roi)
def _get_data_dimensions_rc(self):
return self.gav["HEIGHT"]//self._frame_merge,self.gav["WIDTH"]
[docs]
def get_detector_size(self):
return self.siso_detector_size or (self.get_grabber_attribute("WIDTH").max,self.get_grabber_attribute("HEIGHT").max)
get_grabber_detector_size=get_detector_size
[docs]
def get_roi(self):
w,h=self.gav["WIDTH"],self.gav["HEIGHT"]//self._frame_merge
l,t=self.gav["XOFFSET"],self.gav["YOFFSET"]
return l,l+w,t,t+h
get_grabber_roi=get_roi
[docs]
@camera.acqcleared
def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
hlim,vlim=self.get_grabber_roi_limits()
if hend is None:
hend=hlim.max
if vend is None:
vend=vlim.max
hstart,hend=self._truncate_roi_axis((hstart,hend),hlim)
vstart,vend=self._truncate_roi_axis((vstart,vend),vlim)
if self._frame_merge!=1 and vstart!=0:
raise ValueError("frame merging is only supported with full vertical frame size")
self.gav["XOFFSET"]=0
self.gav["WIDTH"]=hend-hstart
self.gav["XOFFSET"]=hstart
self.gav["YOFFSET"]=0
self.gav["HEIGHT"]=(vend-vstart)*self._frame_merge
self.gav["YOFFSET"]=vstart # non-zero only if self._frame_merge==1
return self.get_grabber_roi()
set_grabber_roi=set_roi
[docs]
def get_roi_limits(self, hbin=1, vbin=1):
w,h=self.get_grabber_attribute("WIDTH"),self.get_grabber_attribute("HEIGHT")
x,y=self.get_grabber_attribute("XOFFSET"),self.get_grabber_attribute("YOFFSET")
detsize=self.get_detector_size()
hlim=camera.TAxisROILimit(w.min,detsize[1],x.inc,w.inc,1)
vlim=camera.TAxisROILimit(h.min,detsize[0],y.inc,h.inc,1)
return hlim,vlim
get_grabber_roi_limits=get_roi_limits
[docs]
class BufferManager:
"""Frame buffer manager which controls and schedules the buffer and the buffer copying loop"""
_fg_nframes_max=2**10
def __init__(self, fg, siso_port):
self.fg=fg
self.siso_port=siso_port
self.head=None
self.use_tmp_buffer=False
self.deallocate()
self.looping=ctypes.c_uint(0)
self.nread=ctypes.c_int64(0)
self.new_nacq=ctypes.c_int64(0)
self.oldest_valid=ctypes.c_int64(0)
self.debug_info=(ctypes.c_int64*3)()
self._buffer_loop_thread=None
[docs]
def allocate(self, nframes, frame_size):
"""Allocate and schedule buffers with the given number and size"""
self.deallocate()
self.use_tmp_buffer=nframes>self._fg_nframes_max
if self.use_tmp_buffer:
self.fg_nframes=self._fg_nframes_max
self.fg_buffer=ctypes.create_string_buffer(self._fg_nframes_max*frame_size)
self.tmp_nframes=nframes
self.tmp_buffer=ctypes.create_string_buffer(nframes*frame_size)
else:
self.fg_nframes=nframes
self.fg_buffer=ctypes.create_string_buffer(nframes*frame_size)
self.nframes=nframes
self.frame_size=frame_size
self.head=lib.Fg_AllocMemHead(self.fg,self.fg_nframes*frame_size,self.fg_nframes)
for i in range(self.fg_nframes):
lib.Fg_AddMem(self.fg,ctypes.addressof(self.fg_buffer)+i*frame_size,frame_size,i,self.head)
[docs]
def deallocate(self):
"""Deallocate and remove the buffers"""
if self.use_tmp_buffer:
self.stop_loop()
if self.head is not None:
for i in range(self.fg_nframes):
lib.Fg_DelMem(self.fg,self.head,i)
lib.Fg_FreeMemHead(self.fg,self.head)
self.head=None
self.fg_buffer=None
self.tmp_buffer=None
self.fg_nframes=None
self.tmp_nframes=None
self.nframes=None
self.frame_size=None
self.run_nframes=0
[docs]
def start_loop(self, run_nframes):
"""Start the copying loop and, optionally, run the acquisition loop with the given number of frames"""
self.stop_loop()
self.new_nacq.value=0
timeout=3600*24*365
lib.Fg_registerApcHandler(self.fg,self.siso_port,0,get_callback(),ctypes.addressof(self.new_nacq),timeout,Fg_Apc_Flag.FG_APC_BATCH_FRAMES)
if not self.use_tmp_buffer:
return False
self._buffer_loop_thread=threading.Thread(target=self._run_schedule_looper,daemon=True)
self.looping.value=1
self.nread.value=0
self.oldest_valid.value=0
self.run_nframes=run_nframes
self._buffer_loop_thread.start()
return True
def _run_schedule_looper(self):
code=looper(self.fg,self.siso_port,self.head,
ctypes.addressof(self.fg_buffer),ctypes.addressof(self.tmp_buffer),self.frame_size,self.fg_nframes,self.tmp_nframes,self.run_nframes,
ctypes.addressof(self.looping),ctypes.addressof(self.new_nacq),ctypes.addressof(self.nread),ctypes.addressof(self.oldest_valid),ctypes.addressof(self.debug_info),
funcaddressof(lib.lib.Fg_getStatusEx),funcaddressof(lib.lib.Fg_AcquireEx))
if code:
raise SIFgrabLibError("Fg_getStatusEx",code)
[docs]
def stop_loop(self):
"""Stop the copying loop"""
if self._buffer_loop_thread is not None:
self.looping.value=0
self._buffer_loop_thread.join()
self._buffer_loop_thread=None
lib.Fg_unregisterApcHandler(self.fg,self.siso_port)
[docs]
def get_status(self):
"""
Get acquisition status.
Return tuple ``(nread, oldest_valid_buffer, nacq, debug_info)``
"""
if self.head is None:
return 0,0,0,[0]*3
if self.use_tmp_buffer:
return self.nread.value,self.oldest_valid.value,self.new_nacq.value,self.debug_info[:]
return self.new_nacq.value,0,self.new_nacq.value,[0]*3
[docs]
def get_frames_data(self, idx, nframes=1):
"""Get buffer chunk addresses for the given number of frames starting from the given index"""
idx%=self.nframes
buff=self.tmp_buffer if self.use_tmp_buffer else self.fg_buffer
if idx+nframes<=self.nframes:
return [(nframes,ctypes.c_char_p(ctypes.addressof(buff)+self.frame_size*idx))]
return [(self.nframes-idx,ctypes.c_char_p(ctypes.addressof(buff)+self.frame_size*idx)),(idx+nframes-self.nframes,buff),]
def _get_acquired_frames(self):
if not self.acquisition_in_progress():
return None
nacq,oldest_valid=self._buffer_mgr.get_status()[:2]
self._frame_counter.set_first_valid_frame(oldest_valid)
return nacq*self._frame_merge
def _setup_buffers(self, nframes):
self._clear_buffers()
nbuff=(nframes-1)//self._frame_merge+1
buffer_size=self._get_buffer_size()
self._buffer_mgr.allocate(nbuff,buffer_size)
def _clear_buffers(self):
if self._buffer_mgr is not None:
self._buffer_mgr.deallocate()
def _ensure_buffers(self):
nbuff=self._buffer_mgr.nframes
buffer_size=self._get_buffer_size()
if buffer_size!=self._buffer_mgr.frame_size:
self._setup_buffers(nbuff)
_camlink_fmts={ ( 8,1):MeCameraLinkFormat.FG_CL_SINGLETAP_8_BIT,
(10,1):MeCameraLinkFormat.FG_CL_SINGLETAP_10_BIT,
(12,1):MeCameraLinkFormat.FG_CL_SINGLETAP_12_BIT,
(14,1):MeCameraLinkFormat.FG_CL_SINGLETAP_14_BIT,
(16,1):MeCameraLinkFormat.FG_CL_SINGLETAP_16_BIT,
( 8,2):MeCameraLinkFormat.FG_CL_DUALTAP_8_BIT,
(10,2):MeCameraLinkFormat.FG_CL_DUALTAP_10_BIT,
(12,2):MeCameraLinkFormat.FG_CL_DUALTAP_12_BIT,}
_camlink_fmts_inv=general.invert_dict(_camlink_fmts)
[docs]
@interface.use_parameters(mode="acq_mode")
def setup_acquisition(self, mode="sequence", nframes=100): # pylint: disable=arguments-differ
"""
Setup acquisition mode.
`mode` can be either ``"snap"`` (single frame or a fixed number of frames) or ``"sequence"`` (continuous acquisition).
(note that :meth:`.IMAQCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired).
`nframes` sets up number of frame buffers.
"""
self.clear_acquisition()
super().setup_acquisition(mode=mode,nframes=nframes)
self._setup_buffers(nframes)
[docs]
def clear_acquisition(self):
"""Clear all acquisition details and free all buffers"""
if self._acq_params:
self.stop_acquisition()
self._clear_buffers()
super().clear_acquisition()
[docs]
def start_acquisition(self, *args, **kwargs):
self.stop_acquisition()
super().start_acquisition(*args,**kwargs)
self._ensure_buffers()
mode=self._acq_params["mode"]
nframes=self._acq_params["nframes"] if mode=="snap" else -1
self._frame_counter.reset(self._buffer_mgr.nframes*self._frame_merge)
if not self._buffer_mgr.start_loop(run_nframes=nframes):
lib.Fg_AcquireEx(self.fg,self.siso_port,nframes,FG_ACQ.ACQ_STANDARD,self._buffer_mgr.head)
self._acq_in_progress=True
[docs]
def stop_acquisition(self):
if self.acquisition_in_progress():
self._buffer_mgr.stop_loop()
self._frame_counter.update_acquired_frames(self._get_acquired_frames())
lib.Fg_stopAcquireEx(self.fg,self.siso_port,self._buffer_mgr.head,FG_ACQ.ACQ_STANDARD)
self._acq_in_progress=False
[docs]
def acquisition_in_progress(self):
return self._acq_in_progress
def _get_buffer_bpp(self):
bpp=self.gav["PIXELDEPTH"]
return (bpp-1)//8+1
def _get_buffer_dtype(self):
return "<u{}".format(self._get_buffer_bpp())
def _get_buffer_size(self):
bpp=self._get_buffer_bpp()
roi=self.get_grabber_roi()
w,h=roi[1]-roi[0],roi[3]-roi[2]
return w*h*bpp*self._frame_merge
def _parse_buffer(self, buffer, nframes=1):
r,c=self._get_data_dimensions_rc()
dt=self._get_buffer_dtype()
cdt=ctypes.POINTER(np.ctypeslib.as_ctypes_type(dt))
data=np.ctypeslib.as_array(ctypes.cast(buffer,cdt),shape=((nframes*self._frame_merge,r,c)))
return data.copy()
def _trim_images_range(self, rng):
acquired_frames=self._get_acquired_frames()
if acquired_frames is None:
return None
self._frame_counter.update_acquired_frames(acquired_frames)
if rng is None:
rng,skipped=self._frame_counter.get_new_frames_range(),0
rng=(rng[0]//self._frame_merge)*self._frame_merge,((rng[1]-1)//self._frame_merge+1)*self._frame_merge
else:
rng=(rng[0]//self._frame_merge)*self._frame_merge,((rng[1]-1)//self._frame_merge+1)*self._frame_merge
rng,skipped=self._frame_counter.trim_frames_range(rng)
return rng,skipped
_support_chunks=True
def _read_frames(self, rng, return_info=False):
raw_frames=self._buffer_mgr.get_frames_data(rng[0]//self._frame_merge,rng[1]//self._frame_merge-rng[0]//self._frame_merge) if rng[1]>rng[0] else []
if return_info:
params=[FG_PARAM.FG_IMAGE_TAG,FG_PARAM.FG_TIMESTAMP,FG_PARAM.FG_TIMESTAMP_LONG]
frame_info=[]
chn=[n for n,_ in raw_frames]
chidx=np.cumsum([rng[0]]+chn[:-1])
frng=range(rng[0]//self._frame_merge,rng[1]//self._frame_merge)
frame_info=[np.array(frng)*self._frame_merge]
for p in params:
try:
frame_info.append([lib.Fg_getParameterEx_auto(self.fg,p,self.siso_port,self._buffer_mgr.head,i+1) if i%self._frameinfo_period==0 else 0 for i in frng])
except SIFgrabLibError:
frame_info.append([0]*len(frng))
frame_info=np.array(frame_info)
frame_info[1:,frame_info[0]%self._frameinfo_period!=0]=-1
frame_info=[frame_info[:,i:i+n].T for i,n in zip(chidx-chidx[0],chn)]
else:
frame_info=None
parsed_frames=[self._convert_indexing(self._parse_buffer(b,nframes=n),"rct",axes=(-2,-1)) for n,b in raw_frames]
return parsed_frames,frame_info
[docs]
class SiliconSoftwareCamera(SiliconSoftwareFrameGrabber):
"""
Generic Silicon Software frame grabber interface.
Args:
board: board index, starting from 0; available boards can be learned by :func:`list_boards`
applet: applet name, which can be learned by :func:`list_applets`;
usually, a simple applet like ``"DualLineGray16"`` or ``"MediumLineGray16`` are most appropriate;
can be either an applet name, or a direct path to the applet DLL
port: port number, if several ports are supported by the camera and the applet
detector_size: if not ``None``, can specify the maximal detector size;
by default, use the maximal available for the frame grabber (usually, 16384x16384)
"""
def __init__(self, board, applet=None, port=0, detector_size=None):
super().__init__(siso_board=board,siso_applet=applet,siso_port=port,siso_detector_size=detector_size)