from ...core.utils import files, general, library_parameters
import platform
import ctypes
import sys
import os
import threading
import contextlib
import collections
_store_loaded_dlls=False
_loaded_dlls=[]
[docs]
def get_os_lib_folder():
"""Get default Windows DLL folder (``System32`` or ``SysWOW64``, depending on Python and Windows bitness)"""
if sys.platform!="win32":
return ""
arch=platform.architecture()[0]
winarch="64bit" if platform.machine().endswith("64") else "32bit"
if winarch==arch:
return os.path.join(os.environ.get("WINDIR","C:\\Windows"),"System32")
else: # 32 bit Python on 64 but OS (the reverse is impossible)
return os.path.join(os.environ.get("WINDIR","C:\\Windows"),"SysWOW64")
os_lib_folder=get_os_lib_folder()
[docs]
def get_program_files_folder(subfolder="", arch=None):
"""
Get default Windows Program Files folder or a subfolder within it.
If `arch` is ``None``, use the current Python architecture to determine the folder;
otherwise, it specifies the architecture (``"32bit"`` for ``Program Files (x86)``, ``"64bit"`` for ``Program Files``)
"""
if sys.platform!="win32":
return ""
if subfolder:
return os.path.join(get_program_files_folder(arch=arch),subfolder)
if arch is None:
arch=platform.architecture()[0]
elif arch not in ["32bit","64bit"]:
raise ValueError("unrecognized architecture: {}".format(arch))
winarch="64bit" if platform.machine().endswith("64") else "32bit"
if arch=="32bit" and winarch=="64bit":
return os.environ.get("PROGRAMFILES(X86)",r"C:\Program Files (x86)")
else:
return os.environ.get("PROGRAMFILES",r"C:\Program Files")
program_files_folder=get_program_files_folder()
[docs]
def get_appdata_folder(subfolder="", kind="roaming"):
"""
Get user AppData folder (used to install software only for specific users).
`kind` can be ``"roaming"`` (return ``Roaming`` AppData folder) or ``"local"`` (return ``Local`` AppData folder).
"""
if subfolder:
folder=get_appdata_folder(kind=kind)
return os.path.join(folder,subfolder) if folder is not None else None
if kind=="roaming":
return os.environ.get("APPDATA")
if kind=="local":
return os.environ.get("LOCALAPPDATA")
raise ValueError("unrecognized appdata folder kind: {}".format(kind))
[docs]
def get_environ_folder(var, subfolder="", error_missing=False):
"""
Get subfolder of a folder based on the environment variable.
If the environment variable is missing and ``error_missing==True``, raise an error; otherwise, return ``None``.
"""
folder=os.environ.get(var,None)
if folder is None:
if error_missing:
raise ValueError("could not find environment variable {}".format(var))
return None
return os.path.join(folder,subfolder)
_load_lock=threading.RLock()
par_error_message="If you already have it, specify its path as pylablib.par['devices/dlls/{}']='path/to/dll/'"
def _load_dll(path, kind, add_environ_paths=True):
"""Load dll using PATH environment variable in Python 3.8+"""
if add_environ_paths and hasattr(os,"add_dll_directory"):
try:
return _load_dll(path,kind,add_environ_paths=False)
except OSError:
pass
add_paths=[os.path.abspath(p) for p in os.environ.get("PATH","").split(os.pathsep) if p]
add_paths+=[os.path.abspath(".")]
added_dirs=[]
try:
for p in add_paths:
try:
added_dirs.append(os.add_dll_directory(p)) # pylint: disable=no-member
except OSError: # missing folder
pass
return ctypes.cdll.LoadLibrary(path) if kind=="cdecl" else ctypes.windll.LoadLibrary(path)
finally:
for d in added_dirs:
d.close()
else:
return ctypes.cdll.LoadLibrary(path) if kind=="cdecl" else ctypes.windll.LoadLibrary(path)
[docs]
def load_lib(name, locations=("global",), call_conv="cdecl", locally=False, depends=None, depends_required=True, error_message=None, check_order="location", return_location=False):
"""
Load DLL.
Args:
name: name or path of the library (can also be a list or a tuple with several names, which are tried in that order).
locations: list or tuple of locations to search for a library; the function tries locations in order and returns the first successfully loaded library
a location is a string which can be a path to the containing folder,
``"parameter/*"`` (the remaining part is a subpath inside ``"devices/dlls"`` library parameters; if this parameter is defined, it names folder or file for the dll),
or ``"global"`` (load path as is; also searches in the standard OS specified locations determined by ``PATH`` variable, e.g., ``System32`` folder).
depends: if specified, it is a list of dependency libraries which need to be loaded first before the main DLL; they are assumed to be in the same location as the main file
depends_required: if ``False``, ignore errors during dependency loads
locally(bool): if ``True``, prepend path to the DLL containing folder to the environment ``PATH`` folders;
this is usually required, if the loaded DLL imports other DLLs in the same folder
call_conv(str): DLL call convention; can be either ``"cdecl"`` (corresponds to ``ctypes.cdll``) or ``"stdcall"`` (corresponds to ``ctypes.windll``)
error_message(str): error message to add in addition to the default error message shown when the DLL is not found
check_order(str): determines the order in which possible combinations of names and locations are looped over;
can be ``"location"`` (loop over locations, and for each location loop over names), ``"name"`` (loop over names, and for each name loop over locations),
or a list of tuples ``[(loc,name)]`` specifying order of checking
(in the latter case, `name` and `location` arguments are ignored, except for generating error message).
return_location(bool): if ``True``, return a tuple ``(dll, location, folder)`` instead of a single dll.
"""
if platform.system()!="Windows":
raise OSError("DLLs are not available on non-Windows platform")
if not isinstance(name,(list,tuple)):
name=[name]
if not isinstance(locations,(list,tuple)):
locations=[locations]
if check_order=="location":
check_order=[(loc,n) for loc in locations for n in name]
elif check_order=="name":
check_order=[(loc,n) for n in name for loc in locations]
for loc,n in check_order:
if loc=="global":
folder=""
else:
if loc.startswith("parameter/"):
par_name=loc[len("parameter/"):]
if ("devices/dlls",par_name) in library_parameters.library_parameters:
loc=library_parameters.library_parameters["devices/dlls",par_name]
else:
continue
if loc.lower().endswith(".dll"):
folder,n=os.path.split(loc)
else:
folder=loc
if folder.startswith("."):
folder=os.path.abspath(folder)
path=os.path.join(folder,n)
lock=_load_lock if locally else general.DummyResource()
with lock:
if locally:
loc_folder,loc_name=os.path.split(path)
old_env_path=os.environ.get("PATH",None)
env_paths=old_env_path.split(os.pathsep) if old_env_path else []
if not any([files.paths_equal(loc_folder,ep) for ep in env_paths if ep]):
os.environ["PATH"]=files.normalize_path(loc_folder)+(os.pathsep+old_env_path if old_env_path else "")
path=loc_name
folder=loc_folder
depends=depends or []
paths=[os.path.join(folder,dn) for dn in depends]+[path]
try:
dlls=[]
add_environ_paths=library_parameters.library_parameters.get("devices/dlls/add_environ_paths",True)
for p in paths:
if call_conv in ["cdecl","stdcall"]:
try:
dlls.append(_load_dll(p,call_conv,add_environ_paths=add_environ_paths))
if _store_loaded_dlls:
_loaded_dlls.append((p,dlls[-1]))
except OSError:
if depends_required or p==paths[-1]:
raise
else:
raise ValueError("unrecognized call convention: {}".format(call_conv))
return (dlls[-1],loc,paths[-1]) if return_location else dlls[-1]
except OSError:
if locally:
if old_env_path is None:
del os.environ["PATH"]
else:
os.environ["PATH"]=old_env_path
error_message="\n"+error_message if error_message else ""
raise OSError("can't import library {}".format(" or ".join(name))+error_message)
TLibraryOpenResult=collections.namedtuple("TLibraryOpenResult",["init_result","open_result","opid"])
TLibraryCloseResult=collections.namedtuple("TLibraryCloseResult",["close_result","uninit_result"])
[docs]
class LibraryController:
"""
Simple wrapper to control libraries which require initialization when a new device is opened
or shutdown when all devices are closed.
Args:
lib: controlled library
"""
def __init__(self, lib):
self.open_devices=0
self.lib=lib
self.lock=threading.RLock()
self.initialized=False
self._counter=general.UIDGenerator()
self.opened=set()
self.closed=set()
def _do_preinit(self):
"""
Perform pre-initialization.
Called only once on the first controller call.
"""
self.lib.initlib()
def _do_init(self):
"""
Perform initialization.
Called whenever the first device is opened (including the instances when after all devices have been previously closed).
"""
def _do_uninit(self):
"""
Perform shutdown.
Called whenever the last device is closed.
"""
def _do_open(self):
"""
Perform opening-related procedures.
Called whenever any devices is opened.
"""
def _do_close(self):
"""
Perform closing-related procedures.
Called whenever any devices is closed.
"""
[docs]
def preinit(self):
"""Pre-initialize the library, if it hasn't been done already"""
with self.lock:
if not self.initialized:
self._do_preinit()
self.initialized=True
[docs]
def open(self):
"""
Mark device opening.
Return tuple ``(init_result, open_result, opid)`` with the results of the initialization and the opening,
and the opening ID which should afterwards be used for closing.
If library is already initialized, set ``init_result=None``
"""
with self.lock:
self.preinit()
init_result=None
if self.open_devices==0:
init_result=self._do_init() # pylint: disable=assignment-from-no-return
open_result=self._do_open() # pylint: disable=assignment-from-no-return
self.open_devices+=1
opid=self._counter()
self.opened.add(opid)
return TLibraryOpenResult(init_result,open_result,opid)
[docs]
def close(self, opid):
"""
Mark device closing.
Return tuple ``(close_result, uninit_result)`` with the results of the closing and the shutdown.
If library does not need to be shut down yet, set ``uninit_result=None``
"""
if opid is None:
return
with self.lock:
if opid in self.opened:
self.opened.remove(opid)
self.closed.add(opid)
elif opid in self.closed:
return None,None
else:
raise ValueError("supplied opid {} has never been issued".format(opid))
self.preinit()
self.open_devices-=1
close_result=self._do_close() # pylint: disable=assignment-from-no-return
uninit_result=None
if self.open_devices==0:
uninit_result=self._do_uninit() # pylint: disable=assignment-from-no-return
return TLibraryCloseResult(close_result,uninit_result)
[docs]
@contextlib.contextmanager
def temp_open(self):
"""Context for temporarily opening a new device connection"""
opid=None
try:
init_result,open_result,opid=self.open()
yield init_result,open_result
finally:
self.close(opid)
[docs]
def shutdown(self):
"""Close all opened connections and shutdown the library"""
for opid in list(self.opened):
self.close(opid)
[docs]
def get_opened_num(self):
"""Get number of opened devices"""
with self.lock:
return self.open_devices