"""
Routines for defining a unified interface across multiple backends.
"""
from ..utils import funcargparse, general, net, py3, module, functions as func_utils
from . import interface
from . import backend_logger
from .base import DeviceError
import time
import re
import contextlib
import warnings
import functools
### Generic backend interface ###
[docs]
class DeviceBackendError(DeviceError):
"""Generic exception relaying a backend error"""
def __init__(self, exc):
msg="backend exception: {} ('{}')".format(repr(exc),str(exc))
super().__init__(msg)
self.backend_exc=exc
[docs]
def reraise(func):
"""Wrapper for a backend method which intercepts backend exceptions and re-emits them as a subclass of :exc:`DeviceBackendError` defined in the class"""
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
if self.Error is None or self.BackendError is None:
return func(self,*args,**kwargs)
try:
return func(self,*args,**kwargs)
except self.BackendError as exc:
ReraiseError=getattr(self,"ReraiseError",self.Error)
raise ReraiseError(exc) from exc
return wrapped
[docs]
def logerror(func):
"""Wrapper for a backend method which logs if any errors escaped"""
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
return func(self,*args,**kwargs)
except self.Error as exc:
self._log("error",str(exc))
raise
return wrapped
logger=None
[docs]
class IDeviceCommBackend:
"""
An abstract class for a device communication backend.
Connection is automatically opened on creation.
Args:
conn: Connection parameters (depend on the backend).
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations.
term_read (str): Line terminator for reading operations.
datatype (str): Type of the returned data; can be ``"bytes"`` (return ``bytes`` object), ``"str"`` (return ``str`` object),
or ``"auto"`` (default Python result: ``str`` in Python 2 and ``bytes`` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
Error=DeviceBackendError
BackendError=None
"""Base class for the errors raised by the backend operations"""
_default_operation_cooldown={"default":0.}
def __init__(self, conn, timeout=None, term_write=None, term_read=None, datatype="auto", reraise_error=None): # pylint: disable=unused-argument
funcargparse.check_parameter_range(datatype,"datatype",{"auto","str","bytes"})
self.datatype=datatype
self.conn=conn
self.term_write=term_write
self.term_read=term_read
self._operation_cooldown=dict(self._default_operation_cooldown)
if reraise_error is not None:
self.Error=reraise_error
_conn_params=["addr"]
_default_conn=[None]
@classmethod
def _conn_to_dict(cls, conn):
"""Turn connection parameters (tuple or dict) into a full dictionary using class-specific parameter names"""
if isinstance(conn, dict):
return conn
if isinstance(conn, (tuple,list)):
return dict(zip(cls._conn_params,conn))
return {cls._conn_params[0]:conn}
[docs]
@classmethod
def combine_conn(cls, conn1, conn2):
"""Combined two connection parameters into a single dictionary (`conn1` overrides `conn2`)"""
conn=cls._conn_to_dict(conn2).copy()
conn.update(cls._conn_to_dict(conn1))
return conn
[docs]
@classmethod
def get_backend_name(cls):
"""Get string representation of the backend (e.g., ``"serial"``, ``"visa"``, or ``"network"``)"""
return getattr(cls,"_backend",None)
def _to_datatype(self, data):
if self.datatype=="auto":
return data
if self.datatype=="str":
return py3.as_str(data)
return py3.as_bytes(data)
[docs]
def open(self):
"""Open the connection"""
pass
[docs]
def close(self):
"""Close the connection"""
pass
[docs]
def is_opened(self):
"""Check if the device is connected"""
return True
def __bool__(self):
return self.is_opened()
__nonzero__=__bool__ # Python 2 compatibility
def _log(self, operation, value):
"""Log the operation (used for testing and debugging)"""
if logger:
logger.log(operation,value)
[docs]
def lock(self, timeout=None):
"""Lock the access to the device from other threads/processes (isn't necessarily implemented)"""
pass
[docs]
def unlock(self):
"""Unlock the access to the device from other threads/processes (isn't necessarily implemented)"""
pass
[docs]
@contextlib.contextmanager
def locking(self, timeout=None): # pylint: disable=unused-argument
"""Context manager for lock & unlock"""
yield
[docs]
def setup_cooldown(self, **kwargs):
"""
Setup cooldown times for various operations.
The arguments are of the form ``kind=value``, where ``value`` is the cooldown time (in seconds),
and ``kind`` is the operation kind (common kinds are ``open``, ``close``, ``read``, ``write``, ``timeout``, and ``flush``).
``kind`` can also be ``default`` (default value for all kind), or ``all`` (reset all cooldown values to this value).
The cooldowns of the given kinds are usually called after the corresponding operation (it is necessary for some devices, otherwise the communication can freeze or crush).
Default cooldown values are specified by ``_default_operation_cooldown`` class attribute dictionary.
"""
if "all" in kwargs:
self._operation_cooldown={"default":kwargs.pop("all")}
self._operation_cooldown.update(kwargs)
[docs]
def cooldown(self, kind="default"):
"""
Cooldown between the operations.
``kind`` specifies the operation kind (common kinds are ``open``, ``close``, ``read``, ``write``, ``timeout``, and ``flush``);
``"default"`` corresponds to the default cooldown (usually, specified as 0).
Called automatically by various backend operations, so usually there is no need to call explicitly.
"""
cooldown=self._operation_cooldown.get(kind,self._operation_cooldown.get("default",0))
if cooldown>0:
time.sleep(cooldown)
[docs]
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
pass
[docs]
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return None
[docs]
@contextlib.contextmanager
def using_timeout(self, timeout=None):
"""Context manager for usage of a different timeout inside a block"""
if timeout is not None:
to=self.get_timeout() # pylint: disable=assignment-from-none
if to!=timeout:
self.set_timeout(timeout)
try:
yield
finally:
if timeout is not None and to!=timeout:
self.set_timeout(to)
[docs]
def readline(self, remove_term=True, timeout=None, skip_empty=True):
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
"""
raise NotImplementedError("IDeviceCommBackend.readline")
[docs]
def readlines(self, lines_num, remove_term=True, timeout=None, skip_empty=True):
"""
Read multiple lines from the device.
Parameters are the same as in :func:`readline`.
"""
return [self.readline(remove_term=remove_term,timeout=timeout,skip_empty=skip_empty) for _ in range(lines_num)]
[docs]
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (the standard timeout applies); otherwise, read all available data (return immediately).
"""
raise NotImplementedError("IDeviceCommBackend.read")
[docs]
def flush_read(self):
"""Flush the device output (read all the available data; return the number of bytes read)"""
return len(self.read())
[docs]
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``flush==True``, flush the write buffer.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
"""
raise NotImplementedError("IDeviceCommBackend.write")
[docs]
def ask(self, query, delay=0., read_all=False):
"""
Perform a write followed by a read, with `delay` in between.
If ``read_all==True``, read all the available data; otherwise, read a single line.
"""
self.write(query)
if delay:
time.sleep(delay)
if read_all:
return self.read()
else:
return self.readline()
[docs]
@staticmethod
def list_resources(desc=False): # pylint: disable=unused-argument
"""
List all available resources for this backend.
If ``desc==False``, return list of connections (usually strings or tuples), which can be used to connect to the device.
Otherwise, return a list of descriptions, which have more info, but can be backend-dependent.
Might not be implemented (depending on the backend), in which case returns ``None``.
"""
return None
### Helper functions ###
[docs]
def remove_longest_term(msg, terms):
"""
Remove the longest terminator among `terms` from the end of the message.
"""
tcs=0
for t in terms:
if msg.endswith(py3.as_builtin_bytes(t)):
tcs=max(tcs,len(t))
return msg[:-tcs]
### Specific backends ###
_backends={}
_backend_errors={}
_backend_install_message=("{name:} package is missing. You can install it via PyPi as 'pip install {pkg:}'. "
"If it is installed, check if it imports correctly by running 'import {mod:}'")
try:
try:
import pyvisa as visa
except ImportError:
import visa
[docs]
class DeviceVisaError(DeviceBackendError):
"""Visa backend operation error"""
[docs]
class VisaDeviceBackend(IDeviceCommBackend):
"""
NIVisa backend (via pyVISA).
Connection is automatically opened on creation.
Args:
conn (str): Connection string.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): Line terminator for reading operations (specifies when :func:`readline` stops).
do_lock (bool): If ``True``, employ locking operations; otherwise, locking function does nothing.
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
_backend="visa"
BackendError=visa.VisaIOError
"""Base class for the errors raised by the backend operations"""
Error=DeviceVisaError
if module.cmp_versions(visa.__version__,"1.6")=="<": # older pyvisa versions have a slightly different interface
@reraise
def _set_timeout(self, timeout):
self.instr.timeout=timeout
@reraise
def _get_timeout(self):
return self.instr.timeout
@reraise
def _open_resource(self, conn):
if not self.term_write.endswith(self.term_read):
raise NotImplementedError("PyVisa version <1.6 doesn't support different terminators for reading and writing; update to a newer version by running 'pip install --upgrade pyvisa'")
instr=visa.instrument(conn) # pylint: disable=no-member
instr.term_chars=self.term_read
self.term_write=self.term_write[:len(self.term_write)-len(self.term_read)]
return instr
_lock_default=False
def _lock(self, timeout=None):
raise NotImplementedError("PyVisa version <1.6 doesn't support locking; update to a newer version by running 'pip install --upgrade pyvisa'")
def _unlock(self):
raise NotImplementedError("PyVisa version <1.6 doesn't support locking; update to a newer version by running 'pip install --upgrade pyvisa'")
def _lock_context(self, timeout=None):
raise NotImplementedError("PyVisa version <1.6 doesn't support locking; update to a newer version by running 'pip install --upgrade pyvisa'")
@reraise
def _read_term(self):
return py3.as_builtin_bytes(self.instr.term_chars)
else:
@reraise
def _set_timeout(self, timeout):
self.instr.timeout=timeout*1000. # in newer versions timeout is in ms
@reraise
def _get_timeout(self):
return self.instr.timeout/1000. # in newer versions timeout is in ms
@reraise
def _open_resource(self, conn):
instr=visa.ResourceManager().open_resource(conn)
instr.read_termination=self.term_read
instr.write_termination=self.term_write
self.term_read=self.term_write=""
return instr
_lock_default=False
@reraise
def _lock(self, timeout=None):
self.instr.lock(timeout=timeout*1000. if timeout is not None else None)
@reraise
def _unlock(self):
self.instr.unlock()
@reraise
def _lock_context(self, timeout=None):
return self.instr.lock_context(timeout=timeout*1000. if timeout is not None else None)
@reraise
def _read_term(self):
return py3.as_builtin_bytes(self.instr.read_termination)
[docs]
@staticmethod
def list_resources(desc=False):
try:
return visa.ResourceManager().list_resources_info() if desc else visa.ResourceManager().list_resources()
except VisaDeviceBackend.BackendError as e:
raise VisaDeviceBackend.Error(e) from e
if module.cmp_versions(visa.__version__,"1.9")=="<": # older pyvisa versions have a slightly different interface
@reraise
def _read_raw(self, size):
chunk_size=self.instr.chunk_size
data=bytearray()
with self.instr.ignore_warning(visa.constants.VI_SUCCESS_DEV_NPRESENT,visa.constants.VI_SUCCESS_MAX_CNT):
while len(data)<size:
to_read=min(chunk_size,size-len(data))
chunk,_=self.instr.visalib.read(self.instr.session,to_read)
data.extend(chunk)
return bytes(data)
else:
@reraise
def _read_raw(self, size):
return self.instr.read_bytes(size)
@reraise
def _read_all(self):
data=bytearray()
with self.using_timeout(0):
while True:
try:
chunk=self.instr.read_raw()
data.extend(chunk)
except visa.VisaIOError as err:
if err.abbreviation=="VI_ERROR_TMO":
return bytes(data)
else:
raise
def __init__(self, conn, timeout=10., term_write=None, term_read=None, do_lock=None, datatype="auto", reraise_error=None):
if term_write is None:
term_write=b"\r\n"
if term_read is None:
term_read=b"\n"
super().__init__(conn,term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
self.instr=None
try:
self.instr=self._open_resource(self.conn)
self.opened=True
self._do_lock=do_lock if do_lock is not None else self._lock_default
self.cooldown("open")
self.set_timeout(timeout)
except self.BackendError as e:
raise self.Error(e) from e
[docs]
@reraise
def open(self):
"""Open the connection"""
self.instr.open()
self.opened=True
self.cooldown("open")
[docs]
@reraise
def close(self):
"""Close the connection"""
self.instr.close()
self.opened=False
self.cooldown("close")
[docs]
def is_opened(self):
return self.opened
[docs]
def lock(self, timeout=None):
"""Lock the access to the device from other threads/processes"""
if self._do_lock:
self.lock(timeout=timeout)
[docs]
def unlock(self):
"""Unlock the access to the device from other threads/processes"""
if self._do_lock:
self.unlock()
[docs]
def locking(self, timeout=None):
"""Context manager for lock & unlock"""
if self._do_lock:
return self._lock_context(timeout=timeout)
else:
return general.DummyResource()
[docs]
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
self._set_timeout(timeout)
self.cooldown("timeout")
[docs]
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self._get_timeout()
[docs]
@logerror
@reraise
def readline(self, remove_term=True, timeout=None, skip_empty=True):
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
"""
with self.using_timeout(timeout):
while True:
result=self.instr.read_raw()
if remove_term:
term=self._read_term()
if term and result.endswith(term):
result=result[:-len(term)]
if (not skip_empty) or result:
break
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (the standard timeout applies); otherwise, read all available data (return immediately).
"""
result=self._read_all() if size is None else self._read_raw(size=size)
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
`flush` parameter is ignored.
"""
self._log("write",data)
data=py3.as_builtin_bytes(data)
if self.term_write:
data=data+py3.as_builtin_bytes(self.term_write)
self.instr.write_raw(data)
self.cooldown("write")
if read_echo_delay>0.:
time.sleep(read_echo_delay)
if read_echo:
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "VisaDeviceBackend("+self.instr.__repr__()+")"
_backends["visa"]=VisaDeviceBackend
except ImportError:
pass
_backend_errors["visa"]=_backend_install_message.format(name="PyVISA",pkg="pyvisa",mod="pyvisa")
try:
import serial
try:
import serial.tools.list_ports as serial_list_ports
except ImportError:
serial_list_ports=None
[docs]
class DeviceSerialError(DeviceBackendError):
"""Serial backend operation error"""
[docs]
class SerialDeviceBackend(IDeviceCommBackend):
"""
Serial backend (via pySerial).
Connection is automatically opened on creation.
Args:
conn: Connection parameters. Can be either a string (for a port),
or a list/tuple ``(port, baudrate, bytesize, parity, stopbits, xonxoff, rtscts, dsrdtr)`` supplied to the serial connection
(default is ``('COM1',19200,8,'N',1,0,0,0)``),
or a dict with the same parameters.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops).
connect_on_operation (bool): If ``True``, the connection is normally closed, and is opened only on the operations
(normally two processes can't be simultaneously connected to the same device).
open_retry_times (int): Number of times the connection is attempted before giving up.
no_dtrrts (bool): If ``True``, turn off DTR and RTS status lines before opening (e.g., turns off reset-on-connection for Arduino controllers).
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
_backend="serial"
BackendError=serial.SerialException
"""Base class for the errors raised by the backend operations"""
Error=DeviceSerialError
_conn_params=["port","baudrate","bytesize","parity","stopbits","xonxoff","rtscts","dsrdtr"]
_default_conn=["COM1",19200,8,"N",1,0,0,0]
def __init__(self, conn, timeout=10., term_write=None, term_read=None, connect_on_operation=False, open_retry_times=3, no_dtrrts=False, datatype="auto", reraise_error=None):
conn_dict=self.combine_conn(conn,self._default_conn)
if term_write is None:
term_write=b"\r\n"
if term_read is None:
term_read=b"\n"
if isinstance(term_read,py3.anystring):
term_read=[term_read]
super().__init__(conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
port=conn_dict.pop("port")
self.instr=None
try:
self.instr=serial.serial_for_url(port,do_not_open=True,**conn_dict)
self.opened=True
if no_dtrrts:
try:
self.instr.setDTR(0)
self.instr.setRTS(0)
except self.BackendError:
warnings.warn("Cannot set DTR for an unconnected device")
if not connect_on_operation:
self.instr.open()
self._connect_on_operation=connect_on_operation
self._opened_stack=0
self._open_retry_times=open_retry_times
self.cooldown("open")
self.set_timeout(timeout)
except self.BackendError as e:
raise self.Error(e) from e
@reraise
def _do_open(self):
general.retry_wait(self.instr.open, self._open_retry_times, 0.3)
@reraise
def _do_close(self):
general.retry_wait(self.instr.close, self._open_retry_times, 0.3)
[docs]
def open(self):
"""Open the connection"""
if not self._connect_on_operation and not self.opened:
self._do_open()
self.opened=True
[docs]
def close(self):
"""Close the connection"""
if not self._connect_on_operation and self.opened:
self._do_close()
self.opened=False
[docs]
def is_opened(self):
return self.opened
def _op_open(self):
if self._connect_on_operation:
if not self._opened_stack:
self._do_open()
self._opened_stack=self._opened_stack+1
def _op_close(self):
if self._connect_on_operation:
self._opened_stack=self._opened_stack-1
if not self._opened_stack:
self._do_close()
[docs]
@contextlib.contextmanager
def single_op(self):
"""
Context manager for a single operation.
If ``connect_on_operation==True`` during creation, wrapping several command in `single_op`
prevents the connection from being closed and reopened between the operations (only opened in the beginning and closed in the end).
"""
self._op_open()
try:
yield
finally:
self._op_close()
[docs]
@reraise
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
self.instr.timeout=timeout
self.cooldown("timeout")
[docs]
@reraise
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self.instr.timeout
@reraise
def _read_terms(self, terms=(), read_block_size=8, timeout=None, error_on_timeout=True):
result=b""
singlechar_terms=all(len(t)==1 for t in terms)
terms=[py3.as_builtin_bytes(t) for t in terms]
with self.single_op():
with self.using_timeout(timeout):
while True:
c=self.instr.read(1 if terms else read_block_size)
result=result+c
if c==b"":
if error_on_timeout and terms:
raise self.Error("timeout during read")
return result
if singlechar_terms:
if c in terms:
return result
else:
for t in terms:
if result.endswith(t):
return result
[docs]
@logerror
def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): # pylint: disable=arguments-differ
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
while True:
result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and self.term_read:
result=remove_longest_term(result,self.term_read)
if not (skip_empty and remove_term and (not result)):
break
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
with self.single_op():
if size is None:
result=self._read_terms(timeout=0,error_on_timeout=False)
else:
result=self.instr.read(size=size)
if len(result)!=size:
raise self.Error("read returned less than expected: {} instead of {}".format(len(result),size))
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True):
"""
Read a single line with multiple possible terminators.
Args:
term: Either a string (single multi-char terminator) or a list of strings (multiple terminators).
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
if isinstance(term,py3.anystring):
term=[term]
result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and term:
result=remove_longest_term(result,term)
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``flush==True``, flush the write buffer.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
"""
self._log("write",data)
with self.single_op():
data=py3.as_builtin_bytes(data)
if self.term_write:
data=data+py3.as_builtin_bytes(self.term_write)
self.instr.write(data)
self.cooldown("write")
if flush:
self.instr.flush()
self.cooldown("flush")
if read_echo_delay>0.:
time.sleep(read_echo_delay)
if read_echo:
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "SerialDeviceBackend("+self.instr.__repr__()+")"
[docs]
@staticmethod
def list_resources(desc=False):
if serial_list_ports is not None:
try:
return [(p if desc else p[0]) for p in serial_list_ports.comports()]
except SerialDeviceBackend.BackendError as e:
raise SerialDeviceBackend.Error(e) from e
_backends["serial"]=SerialDeviceBackend
except (ImportError, AttributeError):
pass
_backend_errors["serial"]=_backend_install_message.format(name="PySerial",pkg="pyserial",mod="serial")
try:
import ft232
[docs]
class DeviceFT232Error(DeviceBackendError):
"""FT232 backend operation error"""
[docs]
class FT232DeviceBackend(IDeviceCommBackend):
"""
FT232 backend (via pyft232).
Connection is automatically opened on creation.
Args:
conn: Connection parameters. Can be either a string (for a port),
or a list/tuple ``(port, baudrate, bytesize, parity, stopbits, xonxoff, rtscts)`` supplied to the serial connection
(default is ``('COM1',19200,8,'N',1,0,0,0)``),
or a dict with the same parameters.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops).
open_retry_times (int): Number of times the connection is attempted before giving up.
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
_backend="ft232"
BackendError=ft232.Ft232Exception
"""Base class for the errors raised by the backend operations"""
Error=DeviceFT232Error
_conn_params=["port","baudrate","bytesize","parity","stopbits","xonxoff","rtscts"]
_default_conn=[None,9600,8,"N",1,0,0]
def __init__(self, conn, timeout=10., term_write=None, term_read=None, open_retry_times=3, datatype="auto", reraise_error=None):
conn_dict=self.combine_conn(conn,self._default_conn)
if term_write is None:
term_write=b"\r\n"
if term_read is None:
term_read=b"\n"
if isinstance(term_read,py3.anystring):
term_read=[term_read]
conn_dict=conn_dict.copy()
conn_dict["port"]=str(conn_dict["port"])
super().__init__(conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
port=conn_dict.pop("port")
self.opened=False
self.instr=None
try:
self.instr=self._open_instr(port,conn_dict)
self.opened=True
self._open_retry_times=open_retry_times
self.cooldown("open")
self.set_timeout(timeout)
self._conn_params=(port,conn_dict,timeout)
except self.BackendError as e:
raise self.Error(e) from e
def _open_instr(self, port, params):
sig=func_utils.funcsig(ft232.Ft232)
if "serial_number" in sig.arg_names: # pyft232 v0.11 signature change
return ft232.Ft232(serial_number=port,**params)
else:
return ft232.Ft232(port,**params)
@reraise
def _do_open(self):
if self.is_opened():
return
def reopen():
self.instr=self._open_instr(self._conn_params[0],self._conn_params[1])
self.set_timeout(self._conn_params[2])
self.opened=True
general.retry_wait(reopen, self._open_retry_times, 0.3)
@reraise
def _do_close(self):
if self.is_opened():
general.retry_wait(self.instr.close, self._open_retry_times, 0.3)
self.opened=False
[docs]
def open(self):
"""Open the connection"""
self._do_open()
[docs]
def close(self):
"""Close the connection"""
self._do_close()
[docs]
def is_opened(self):
return self.opened
[docs]
@contextlib.contextmanager
def single_op(self):
"""
Context manager for a single operation.
Does nothing.
"""
yield
[docs]
@reraise
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
if timeout<1E-3:
timeout=1E-3 # 0 is infinite timeout
self.instr.timeout=timeout
self.cooldown("timeout")
[docs]
@reraise
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self.instr.timeout
@reraise
def _read_terms(self, terms=(), read_block_size=8, timeout=None, error_on_timeout=True):
result=b""
singlechar_terms=all(len(t)==1 for t in terms)
terms=[py3.as_builtin_bytes(t) for t in terms]
with self.single_op():
with self.using_timeout(timeout):
while True:
c=self.instr.read(1 if terms else read_block_size)
result=result+c
if c==b"":
if error_on_timeout and terms:
raise self.Error("timeout during read")
return result
if singlechar_terms:
if c in terms:
return result
else:
for t in terms:
if result.endswith(t):
return result
[docs]
@logerror
def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): # pylint: disable=arguments-differ
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
while True:
result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and self.term_read:
result=remove_longest_term(result,self.term_read)
if not (skip_empty and remove_term and (not result)):
break
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
with self.single_op():
if size is None:
result=self._read_terms(timeout=0,error_on_timeout=False)
else:
result=self.instr.read(size=size)
if len(result)!=size:
raise self.Error("read returned less data than expected")
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True):
"""
Read a single line with multiple possible terminators.
Args:
term: Either a string (single multi-char terminator) or a list of strings (multiple terminators).
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
if isinstance(term,py3.anystring):
term=[term]
result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and term:
result=remove_longest_term(result,term)
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``flush==True``, flush the write buffer.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
"""
self._log("write",data)
with self.single_op():
data=py3.as_builtin_bytes(data)
if self.term_write:
data=data+py3.as_builtin_bytes(self.term_write)
self.instr.write(data)
self.cooldown("write")
if flush:
self.instr.flush()
self.cooldown("flush")
if read_echo_delay>0.:
time.sleep(read_echo_delay)
if read_echo:
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "FT232DeviceBackend("+self.instr.__repr__()+")"
@staticmethod
def _as_str(v):
if isinstance(v,py3.anystring):
return py3.as_str(v)
if isinstance(v,(tuple,list)):
return type(v)([FT232DeviceBackend._as_str(e) for e in v])
return s
[docs]
@staticmethod
def list_resources(desc=False):
try:
devices=[FT232DeviceBackend._as_str(d) for d in ft232.list_devices()]
return [d if desc else d[0] for d in devices]
except FT232DeviceBackend.BackendError as e:
raise FT232DeviceBackend.Error(e) from e
_backends["ft232"]=FT232DeviceBackend
except (ImportError,NameError,OSError):
pass
_backend_errors["ft232"]=_backend_install_message.format(name="pyft232",pkg="pyft232",mod="ft232")
[docs]
class DeviceNetworkError(DeviceBackendError):
"""Network backend operation error"""
[docs]
class NetworkDeviceBackend(IDeviceCommBackend):
"""
Serial backend (via pySerial).
Connection is automatically opened on creation.
Args:
conn: Connection parameters. Can be either a string ``"IP:port"`` (e.g., ``"127.0.0.1:80"``), or a tuple ``(IP,port)``, where `IP` is a string and `port` is a number.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops).
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
Note:
If `term_read` is a string, its behavior is different from the VISA backend:
instead of being a multi-char terminator it is assumed to be a set of single-char terminators.
If multi-char terminator is required, `term_read` should be a single-element list instead of a string.
"""
_backend="network"
BackendError=net.socket.error
"""Base class for the errors raised by the backend operations"""
Error=DeviceNetworkError
def __init__(self, conn, timeout=10., term_write=None, term_read=None, datatype="auto", reraise_error=None):
if term_write is None:
term_write="\r\n"
if term_read is None:
term_read="\r\n"
if isinstance(term_read,py3.anystring):
term_read=[term_read]
conn=self._conn_to_dict(conn)
self._split_addr(conn)
super().__init__(conn,term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
try:
self.socket=None
self.open()
self.cooldown("open")
self.set_timeout(timeout)
except self.BackendError as e:
raise self.Error(e) from e
_conn_params=["addr","port"]
_default_conn=["127.0.0.1",80]
@classmethod
def _split_addr(cls, conn):
addr=conn["addr"]
addr_split=addr.split(":")
if len(addr_split)==2:
conn["addr"],conn["port"]=addr_split[0],int(addr_split[1])
elif len(addr_split)>2:
raise ValueError("invalid device address: {}".format(conn))
[docs]
@reraise
def open(self):
"""Open the connection"""
self.close()
self.socket=net.ClientSocket(send_method="fixedlen",recv_method="fixedlen")
self.socket.connect(self.conn["addr"],self.conn["port"])
[docs]
@reraise
def close(self):
"""Close the connection"""
if self.socket is not None:
self.socket.close()
self.socket=None
[docs]
@reraise
def is_opened(self):
return bool(self.socket)
[docs]
@reraise
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
self.socket.set_timeout(timeout)
[docs]
@reraise
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self.socket.get_timeout()
[docs]
@logerror
@reraise
def readline(self, remove_term=True, timeout=None, skip_empty=True):
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
"""
while True:
with self.using_timeout(timeout):
result=self.socket.recv_delimiter(self.term_read,strict=True)
self.cooldown("read")
if remove_term and self.term_read:
result=remove_longest_term(result,self.term_read)
if not (skip_empty and remove_term and (not result)):
break
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
if size is None:
return self.socket.recv_all()
else:
result=self.socket.recv_fixedlen(size)
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read_multichar_term(self, term, remove_term=True, timeout=None):
"""
Read a single line with multiple possible terminators.
Args:
term: Either a string (single multi-char terminator) or a list of strings (multiple terminators).
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
"""
if isinstance(term,py3.anystring):
term=[term]
with self.socket.using_timeout(timeout):
result=self.socket.recv_delimiter(term,strict=True)
self.cooldown("read")
if remove_term and term:
result=remove_longest_term(result,term)
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
`flush` parameter is ignored.
"""
self._log("write",data)
self.socket.send_delimiter(data,self.term_write)
self.cooldown("write")
if read_echo_delay>0.:
time.sleep(read_echo_delay)
if read_echo:
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "NetworkDeviceBackend("+self.socket.__repr__()+")"
_backends["network"]=NetworkDeviceBackend
try:
import usb
import usb.backend.libusb0
import usb.backend.libusb1
import usb.backend.openusb
[docs]
class DeviceUSBError(DeviceBackendError):
"""USB backend operation error"""
[docs]
class PyUSBDeviceBackend(IDeviceCommBackend):
"""
USB backend (via PyUSB package).
Connection is automatically opened on creation.
Args:
conn: Connection parameters. Can be either a string (for a port),
or a list/tuple ``(vendorID, productID, index, endpoint_read, endpoint_write, backend)`` supplied to the connection
(default is ``(0x0000,0x0000,0,0x00,0x01,'libusb1')``, which is invalid for most devices),
or a dict with the same parameters.
``vendorID`` and ``productID`` specify device kind, ``index`` is an integer index (starting from zero) of the device
among several identical (i.e., with the same ids) ones, and ``endpoint_read`` and ``endpoint_write`` specify connection endpoints for the specific device.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops).
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
_backend="pyusb"
BackendError=usb.USBError
"""Base class for the errors raised by the backend operations"""
Error=DeviceUSBError
_conn_params=["vendorID","productID","index","endpoint_read","endpoint_write","backend"]
_default_conn=[0x0000,0x0000,0,0x00,0x01,"libusb1"]
_usb_backends={"libusb0":usb.backend.libusb0, "libusb1":usb.backend.libusb1, "openusb":usb.backend.openusb}
def __init__(self, conn, timeout=10., term_write=None, term_read=None, check_read_size=True, datatype="auto", reraise_error=None):
conn_dict=self.combine_conn(conn,self._default_conn)
funcargparse.check_parameter_range(conn_dict["backend"],"usb_backend",self._usb_backends)
if isinstance(term_read,py3.anystring):
term_read=[term_read]
super().__init__(conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
self.timeout=timeout
self.check_read_size=check_read_size
try:
self.open()
except self.BackendError as e:
raise self.Error(e) from e
self.cooldown("open")
[docs]
@reraise
def open(self):
"""Open the connection"""
idx=self.conn["index"]
backend=self._usb_backends[self.conn["backend"]].get_backend()
all_devs=list(usb.core.find(idVendor=self.conn["vendorID"],idProduct=self.conn["productID"],backend=backend,find_all=True))
if len(all_devs)<idx+1:
raise self.Error("can't find device with VID=0x{:04x} PID=0x{:04x} index {}; {} devices found".format(
self.conn["vendorID"],self.conn["productID"],idx,len(all_devs)))
self.instr=all_devs[idx]
self.ep_read=self.conn["endpoint_read"]
self.ep_write=self.conn["endpoint_write"]
self.cooldown("open")
self.opened=True
[docs]
@reraise
def close(self):
"""Close the connection"""
self.instr.finalize()
self.opened=False
[docs]
def is_opened(self):
return self.opened
[docs]
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
self.timeout=timeout
[docs]
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self.timeout
def _timeout(self, timeout=None):
timeout=self.timeout if timeout is None else timeout
return None if timeout is None else int(timeout*1000)
@reraise
def _read_terms(self, terms=(), read_block_size=65536, timeout=None, error_on_timeout=True):
result=b""
singlechar_terms=all(len(t)==1 for t in terms)
terms=[py3.as_builtin_bytes(t) for t in terms]
while True:
try:
c=self.instr.read(self.ep_read,1 if terms else read_block_size,timeout=self._timeout(timeout)).tobytes()
except self.BackendError:
c=b""
result=result+c
if c==b"":
if error_on_timeout and terms:
raise self.Error("timeout during read")
return result
if not terms:
return result
if singlechar_terms:
if c in terms:
return result
else:
for t in terms:
if result.endswith(t):
return result
[docs]
@logerror
def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): # pylint: disable=arguments-differ
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
while True:
result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and self.term_read:
result=remove_longest_term(result,self.term_read)
if not (skip_empty and remove_term and (not result)):
break
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read(self, size=None, max_read_size=65536): # pylint: disable=arguments-differ
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
if size is None:
result=self._read_terms(read_block_size=max_read_size,timeout=1E-3,error_on_timeout=False)
else:
result=self.instr.read(self.ep_read,size,timeout=self._timeout()).tobytes()
if len(result)!=size and self.check_read_size:
raise self.Error("read returned less than expected {} instead of {}".format(len(result),size))
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True):
"""
Read a single line with multiple possible terminators.
Args:
term: Either a string (single multi-char terminator) or a list of strings (multiple terminators).
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
if isinstance(term,py3.anystring):
term=[term]
result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and term:
result=remove_longest_term(result,term)
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
`flush` parameter is ignored.
"""
self._log("write",data)
data=py3.as_builtin_bytes(data)
if self.term_write:
data=data+py3.as_builtin_bytes(self.term_write)
self.instr.write(self.ep_write,data,timeout=self._timeout())
self.cooldown("write")
if read_echo:
if read_echo_delay>0.:
time.sleep(read_echo_delay)
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "PyUSBDeviceBackend("+self.instr.__repr__()+")"
[docs]
@staticmethod
def list_resources(desc=False, **kwargs):
try:
devs=list(usb.core.find(find_all=True,**kwargs))
except PyUSBDeviceBackend.BackendError as e:
raise PyUSBDeviceBackend.Error from e
if desc:
return devs
indices={}
res=[]
for d in devs:
vid,pid=d.idVendor,d.idProduct
i=indices.get((vid,pid),0)
indices[(vid,pid)]=i+1
res.append((vid,pid,i))
return res
_backends["pyusb"]=PyUSBDeviceBackend
except ImportError:
pass
_backend_errors["pyusb"]=_backend_install_message.format(name="PyUSB",pkg="pyusb",mod="usb")
try:
from . import hid
[docs]
class DeviceHIDError(DeviceBackendError):
"""HID backend operation error"""
[docs]
class HIDeviceBackend(IDeviceCommBackend):
"""
HID backend (via Windows DLLs).
Connection is automatically opened on creation.
Args:
conn: Connection parameters. Can be either a string (for a port),
or a list/tuple ``(vendorID, productID, index, endpoint_read, endpoint_write, backend)`` supplied to the connection
(default is ``(0x0000,0x0000,0,0x00,0x01,'libusb1')``, which is invalid for most devices),
or a dict with the same parameters.
``vendorID`` and ``productID`` specify device kind, ``index`` is an integer index (starting from zero) of the device
among several identical (i.e., with the same ids) ones, and ``endpoint_read`` and ``endpoint_write`` specify connection endpoints for the specific device.
timeout (float): Default timeout (in seconds).
term_write (str): Line terminator for writing operations; appended to the data
term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops).
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
_backend="hid"
BackendError=hid.HIDError
"""Base class for the errors raised by the backend operations"""
Error=DeviceHIDError
_conn_params=["path","rep_fmt"]
_default_conn=[None,"lenpfx"]
def __init__(self, conn, timeout=10., term_write=None, term_read=None, datatype="auto", reraise_error=None):
conn_dict=self.combine_conn(conn,self._default_conn)
if isinstance(term_read,py3.anystring):
term_read=[term_read]
super().__init__(conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype,reraise_error=reraise_error)
self.instr=None
try:
self.instr=hid.HIDevice(path=self.conn["path"],timeout=timeout,rep_fmt=self.conn["rep_fmt"])
self.open()
except self.BackendError as e:
raise self.Error(e) from e
self.cooldown("open")
[docs]
@reraise
def open(self):
"""Open the connection"""
self.instr.open()
self.cooldown("open")
[docs]
@reraise
def close(self):
"""Close the connection"""
self.instr.close()
self.cooldown("close")
[docs]
def is_opened(self):
return self.instr.is_opened()
[docs]
def set_timeout(self, timeout):
"""Set operations timeout (in seconds)"""
if timeout is not None:
self.instr.set_timeout(timeout)
[docs]
def get_timeout(self):
"""Get operations timeout (in seconds)"""
return self.instr.get_timeout()
@reraise
def _read_terms(self, terms=(), read_block_size=8, timeout=None, error_on_timeout=True):
result=b""
singlechar_terms=all(len(t)==1 for t in terms)
terms=[py3.as_builtin_bytes(t) for t in terms]
while True:
try:
c=self.instr.read(1 if terms else read_block_size,timeout=timeout)
except self.BackendError:
c=b""
result=result+c
if c==b"":
if error_on_timeout and terms:
raise self.Error("timeout during read")
return result
if not terms:
return result
if singlechar_terms:
if c in terms:
return result
else:
for t in terms:
if result.endswith(t):
return result
[docs]
@logerror
def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): # pylint: disable=arguments-differ
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
while True:
result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and self.term_read:
result=remove_longest_term(result,self.term_read)
if not (skip_empty and remove_term and (not result)):
break
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def read(self, size=None): # pylint: disable=arguments-differ
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
result=self.instr.read(size)
if size is not None and len(result)!=size:
raise self.Error("read returned less than expected {} instead of {}".format(len(result),size))
self.cooldown("read")
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True):
"""
Read a single line with multiple possible terminators.
Args:
term: Either a string (single multi-char terminator) or a list of strings (multiple terminators).
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout.
"""
if isinstance(term,py3.anystring):
term=[term]
result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout)
self.cooldown("read")
if remove_term and term:
result=remove_longest_term(result,term)
self._log("read",result)
return self._to_datatype(result)
[docs]
@logerror
@reraise
def get_pending(self):
"""Get the number of bytes in the read buffer"""
return self.instr.get_pending()
[docs]
@logerror
@reraise
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
`flush` parameter is ignored.
"""
self._log("write",data)
data=py3.as_builtin_bytes(data)
if self.term_write:
data=data+py3.as_builtin_bytes(self.term_write)
self.instr.write(data)
self.cooldown("write")
if read_echo:
if read_echo_delay>0.:
time.sleep(read_echo_delay)
for _ in range(read_echo_lines):
self.readline()
@reraise
def __repr__(self):
return "HIDeviceBackend("+self.instr.__repr__()+")"
[docs]
@staticmethod
def list_resources(desc=False, **kwargs):
try:
devs=list(hid.list_devices(**kwargs))
except PyUSBDeviceBackend.BackendError as e:
raise PyUSBDeviceBackend.Error from e
if desc:
return devs
return [d.path for d in devs]
_backends["hid"]=HIDeviceBackend
except ImportError:
pass
_backend_errors["hid"]="HID backend is only available on Windows"
[docs]
class DeviceRecordedError(DeviceBackendError):
"""Recorded backend operation error"""
[docs]
class RecordedDeviceBackend(IDeviceCommBackend):
"""
Recorded backend.
Connection is automatically opened on creation.
Args:
conn: connection parameters (recorded log path)
datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object),
or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3)
reraise_error: if not ``None``, specifies an error to be re-raised on any backend exception (by default, use backend-specific error);
should be a subclass of :exc:`DeviceBackendError`.
"""
BackendError=IOError
_backend="recorded"
Error=DeviceRecordedError
_conn_params=["path"]
_default_conn=[None]
def __init__(self, conn, datatype="auto", reraise_error=None):
conn_dict=self.combine_conn(conn,self._default_conn)
super().__init__(conn_dict.copy(),datatype=datatype,reraise_error=reraise_error)
self.log=None
self.log_section=None
self.log_pos=0
try:
self.open()
except IOError as e:
raise self.Error(e) from e
[docs]
@reraise
def open(self):
"""Open the connection"""
if self.log is None:
self.log=dict(backend_logger.load_logfile(self.conn["path"]))
[docs]
def close(self):
"""Close the connection"""
self.log=None
[docs]
def is_opened(self):
return self.log is not None
[docs]
def start(self, header):
"""Start recorded section"""
if header not in self.log:
raise self.Error(IOError("header {} is missing from the record".format(header)))
self.log_section=header
self.log_pos=0
[docs]
def stop(self):
"""Stop logging section"""
self.log_section=None
self.log_pos=0
[docs]
@contextlib.contextmanager
def section(self, header):
self.start(header)
try:
yield
finally:
self.stop()
def _get_value(self, operation):
if self.log is None:
raise self.Error(IOError("device is not opened"))
if self.log_section is None:
raise self.Error(IOError("log section is not selected"))
section=self.log[self.log_section]
if len(section)<=self.log_pos:
raise self.Error(IOError("section is over"))
op,val=section[self.log_pos]
self.log_pos+=1
if operation[0]!=op:
raise self.Error(IOError("requested operation {}, recorded {}".format(operation,op)))
return val
[docs]
def readline(self, remove_term=True, timeout=None, skip_empty=True):
"""
Read a single line from the device.
Args:
remove_term (bool): If ``True``, remove terminal characters from the result.
timeout: Operation timeout. If ``None``, use the default device timeout.
skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``).
"""
result=self._get_value("read")
return self._to_datatype(result)
[docs]
def read(self, size=None):
"""
Read data from the device.
If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately).
"""
result=self._get_value("read")
return self._to_datatype(result)
[docs]
def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1):
"""
Write data to the device.
If ``flush==True``, flush the write buffer.
If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times).
"""
value=self._get_value("write")
if value!=data:
raise self.Error(IOError("requested write {}, recorded {}".format(data,value)))
return value
def __repr__(self):
return "RecordedDeviceBackend("+self.conn["path"]+")"
_backends["recorded"]=RecordedDeviceBackend
_serial_re=re.compile(r"^(com\d+|/?(dev/)?tty.*)",re.IGNORECASE)
def _is_serial_addr(addr):
return isinstance(addr,py3.anystring) and bool(_serial_re.match(addr))
_network_re=re.compile(r"(\d+\.){3}\d+(:\d+)?",re.IGNORECASE)
def _is_network_addr(addr):
return isinstance(addr,py3.anystring) and bool(_network_re.match(addr))
_visa_re=re.compile(r"\w+(::\w+)+",re.IGNORECASE)
def _is_visa_addr(addr):
return isinstance(addr,py3.anystring) and bool(_visa_re.match(addr))
_hid_re=re.compile(r"\\\\\?\\hid.+",re.IGNORECASE)
def _is_hid_addr(addr):
return isinstance(addr,py3.anystring) and bool(_hid_re.match(addr))
[docs]
def autodetect_backend(conn, default="visa"):
"""
Try to determine the backend by the connection.
`default` specifies the default backend which is returned if the backend is unclear.
"""
if isinstance(conn, (tuple,list)):
if len(conn)>=2 and isinstance(conn[0],int) and (0<=conn[0]<65536) and isinstance(conn[1],int) and (0<=conn[1]<65536): # PID / VID
return "pyusb"
conn=conn[0]
elif isinstance(conn, dict):
if "addr" in conn and _is_network_addr(conn["addr"]):
return "network"
if "port" in conn and _is_serial_addr(conn["port"]):
return "serial"
if "vendorID" in conn and "productID" in conn:
return "pyusb"
return default
if _is_network_addr(conn):
return "network"
if _is_serial_addr(conn):
return "serial"
if _is_visa_addr(conn):
return "visa"
if _is_hid_addr(conn):
return "hid"
return default
def _as_backend(backend, conn=None):
if backend=="auto":
backend=autodetect_backend(conn)
if isinstance(backend,tuple) and len(backend)==2 and backend[0]=="auto":
backend=autodetect_backend(conn,default=backend[1])
if isinstance(backend,type) and issubclass(backend,IDeviceCommBackend):
return backend
if backend in _backends:
return _backends[backend]
error_text=_backend_errors.get(backend,None)
raise ValueError("could not find backend {}".format(backend)+(": "+error_text if error_text else ""))
[docs]
def new_backend(conn, backend="auto", defaults=None, **kwargs):
"""
Build new backend with the supplied parameters.
Args:
conn: Connection parameters (depend on the backend). Can be simply connection parameters (tuple or dict) for the given backend
(e.g., ``"192.168.0.1"`` or ``("COM1",19200)``), a tuple ``(backend, conn)`` which specifies both backend and connection
(in which case it overrides the supplied backend), or an already opened backend (in which case it is returned as is)
backend (str): Backend type. Available backends are ``'auto'`` (try to autodetect based on the connection),
``'visa'``, ``'serial'``, ``'ft232'``, ``'network'``, and ``"pyusb"``. Can also be directly a backend class (more appropriate for custom backends),
or a tuple ``('auto', backend)``, which is analogous to ``'auto'``, but it returns the specified ``backend`` if the autodetection fails;
by default, the fallback backend is ``'visa'``, so ``'auto'`` is exactly the same as ``('auto', 'visa')``.
defaults: if not ``None``, specifies a dictionary ``{backend: params}`` with default connection parameters (depending on the backend),
which are added to the connection parameters
**kwargs: parameters sent to the backend.
"""
if isinstance(conn,IDeviceCommBackend):
return conn
if isinstance(conn,tuple) and conn and (conn[0] in _backends or (isinstance(conn[0],type) and issubclass(conn[0],IDeviceCommBackend))):
return new_backend(conn[1],backend=conn[0],defaults=defaults,**kwargs)
backend=_as_backend(backend,conn)
backend_name=getattr(backend,"_backend",None)
if defaults is not None and backend_name is not None and backend_name in defaults:
conn=backend.combine_conn(conn,defaults[backend_name])
return backend(conn,**kwargs)
[docs]
def backend_error(backend, conn=None):
"""
Return error class corresponding to the current backend.
Like :func:`new_backend`, allows setting ``backend="auto"``, in which case `conn` is used to try and autodetect the backend kind
(not completely reliable, should be avoided).
"""
return _as_backend(backend,conn).Error
[docs]
def list_backend_resources(backend=None, desc=False):
"""
List all resources for the given backend.
If `backend` is ``None``, return dictionary ``{backend: resources}`` for all available backends.
If ``desc==False``, return list of connections (usually strings or tuples), which can be used to connect to the device.
Otherwise, return a list of descriptions, which have more info, but can be backend-dependent.
"""
if backend is None:
res={n:b.list_resources(desc=desc) for (n,b) in _backends.items()}
return {n:r for n,r in res.items() if r is not None}
else:
return _as_backend(backend).list_resources(desc=desc)
### Interface for a generic device class employing a communication backend ###
[docs]
class ICommBackendWrapper(interface.IDevice):
"""
A base class for an instrument using a communication backend.
Args:
instr: Backend (assumed to be already opened).
"""
def __init__(self, instr):
super().__init__()
self.instr=instr
self._connection_parameters=self.instr.conn
[docs]
def open(self):
"""Open the backend"""
return self.instr.open()
[docs]
def close(self):
"""Close the backend"""
return self.instr.close()
[docs]
def is_opened(self):
"""Check if the device is connected"""
return bool(self.instr)
def _get_connection_parameters(self):
return self._connection_parameters
[docs]
def lock(self, timeout=None):
"""Lock the access to the device from other threads/processes (isn't necessarily implemented)"""
return self.instr.lock(timeout=timeout)
[docs]
def unlock(self):
"""Unlock the access to the device from other threads/processes (isn't necessarily implemented)"""
return self.instr.unlock()
[docs]
def locking(self, timeout=None):
"""Context manager for lock & unlock"""
return self.instr.locking(timeout=timeout)