from .hid_base import HIDError, HIDLibError, HIDTimeoutError
from .hid_lib import wlib as lib
from ..utils import py3, funcargparse
import ctypes
import collections
import contextlib
import threading
import time
TDeviceDescription=collections.namedtuple("TDeviceDescription",["path","manufacturer","product","serial","vendor_id","product_id","version"])
def _get_device_description(path):
f=lib.CreateFileW(path,0xC0000000,0x03,None,3,0,None)
try:
try:
manufacturer=lib.HidD_GetManufacturerString(f)
except HIDError:
manufacturer=None
try:
product=lib.HidD_GetProductString(f)
except HIDError:
product=None
try:
serial=lib.HidD_GetSerialNumberString(f)
except HIDError:
serial=None
try:
attrs=lib.HidD_GetAttributes(f)
vid=attrs.VendorID
pid=attrs.ProductID
ver=attrs.VersionNumber
except HIDError:
vid,pid,ver=None,None,None
return TDeviceDescription(path,manufacturer,product,serial,vid,pid,ver)
finally:
lib.CloseHandle(f)
[docs]
def list_devices():
"""
List HID devices.
Return list of tuples ``(path, manufacturer, product, serial, vendor_id, product_id, version)``, where ``path`` is the string path used for connection.
"""
lib.initlib()
paths=lib.list_hid_devices()
return [_get_device_description(p) for p in paths]
[docs]
class HIDevice:
"""
Generic HID-based device interface.
Args:
path: HID path (usually obtained using :func:`.hid.list_devices`)
timeout: communication timeout
rep_fmt: HID report format; can be ``"raw"`` (read/write raw data from/to HID),
``"lenpfx"`` (assume a format where the first byte for the report indicates the payload size),
or a tuple ``(parser, builder)`` of two functions, where the ``parser`` takes a single raw report data argument and returns a parsed value,
while ``builder`` takes 2 arguments (data to be written and the output report size) and return the bytes to be sent to HID.
pause_on_write: if ``True``, pause the reading loop when writing; makes some communications more stable
"""
def __init__(self, path, timeout=3., rep_fmt="lenpfx", pause_on_write=True):
if isinstance(rep_fmt,tuple):
if len(rep_fmt)!=2:
raise ValueError("report format should be a 2-tuple (parser, builder), got {} instead".format(rep_fmt))
else:
funcargparse.check_parameter_range(rep_fmt,"rep_fmt",["raw","lenpfx"])
lib.initlib()
self.path=path
self._file=None
self._readbuffsize=2**20
self._rep_fmt=rep_fmt
self.timeout=timeout
self.pause_on_write=pause_on_write
self.open()
[docs]
def open(self):
"""Open the device connection if it is not opened yet"""
if self.is_opened():
return
access=0x80000000|0x40000000 # GENERIC_READ | GENERIC_WRITE
share_mode=0x01|0x02 # FILE_SHARE_READ | FILE_SHARE_WRITE
disposition=3 # OPEN_EXISTING
attrs=0x80|0x40000000 # FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED
self._file=lib.CreateFileW(self.path,access,share_mode,None,disposition,attrs,None)
try:
ppd=lib.HidD_GetPreparsedData(self._file)
try:
self._caps=lib.HidP_GetCaps(ppd)
finally:
lib.HidD_FreePreparsedData(ppd)
self._setup_device()
self._reader=self.Reader(self._file,self._caps,self._readbuffsize,self._make_parser())
self._reader.start_loop()
except HIDError:
self.close()
raise
def _make_parser(self):
if self._rep_fmt=="raw":
return lambda m: m
if self._rep_fmt=="lenpfx":
def parse(m):
l=min(m[0],len(m)-1)
return m[1:l+1]
return parse
return self._rep_fmt[0]
def _build_msg(self, data, msglen):
if self._rep_fmt=="raw":
return data
if self._rep_fmt=="lenpfx":
if msglen is not None and len(data)>=msglen:
raise HIDError("required data size {} is larger than the maximal packet size {}".format(len(data)+1,msglen))
return bytes([len(data)])+data+b"\x00"*(msglen-len(data)-1)
return self._rep_fmt[1](data,msglen)
def _setup_device(self):
ninp=lib.HidD_GetNumInputBuffers(self._file)
while ninp<2**28:
try:
ninp*=2
lib.HidD_SetNumInputBuffers(self._file,ninp)
except HIDLibError as err:
if err.code==0x57: # ERROR_INVALID_PARAMETER
break
raise
[docs]
def close(self):
"""Close the device connection"""
if self._file is None:
return
self._reader.stop_loop()
self._reader=None
lib.CloseHandle(self._file)
self._file=None
[docs]
def is_opened(self):
"""Check if the device connection is opened"""
return self._file is not None
def _check_open(self):
if self._file is None:
raise HIDError("can not perform operation on the closed device")
def __repr__(self):
return "{}('{}')".format(type(self).__name__,self.path)
[docs]
def get_description(self):
"""
Get device description
Return tuple ``(path, manufacturer, product, serial, vendor_id, product_id, version)``, where ``path`` is the string path used for connection.
"""
return _get_device_description(self.path)
[docs]
def get_timeout(self):
"""Get device communication timeout"""
return self.timeout
[docs]
def set_timeout(self, timeout):
"""Set device communication timeout"""
self.timeout=timeout
[docs]
class Reader:
def __init__(self, f, caps, buffsize, parser):
self.f=f
self.msglen=caps.InputReportByteLength
self.nreadbuff=lib.HidD_GetNumInputBuffers(self.f)
self.readbuffsize=self.msglen*self.nreadbuff
self.readbuff=ctypes.create_string_buffer(self.readbuffsize)
self.storebuffsize=buffsize
self.storebuff=bytearray(self.storebuffsize)
self.nread=0
self.npassed=0
self.parser=parser
self._looping=False
self._thread=None
self._read_evts=None
self._data_wait_size=None
self._data_ready=threading.Event()
self._executing=threading.Event()
self._read_lock=threading.Lock()
[docs]
def loop_read(self):
overlapped=lib.make_overlapped(self._read_evts[0])
while self._looping:
self._executing.wait()
with self._read_lock:
complete=lib.ReadFile_async(self.f,self.readbuff,self.readbuffsize,None,ctypes.byref(overlapped))
if not complete:
try:
self._read_lock.release()
res=lib.WaitForMultipleObjects(self._read_evts,False,0xFFFFFFFF)
if res!=0: # failure or stop looping event
lib.CancelIo(self.f)
return
finally:
self._read_lock.acquire()
chread=lib.GetOverlappedResult(self.f,overlapped,False)
if not chread:
time.sleep(1E-3)
continue
data=self.parser(self.readbuff.raw[:chread])
start=self.nread%self.storebuffsize
end=(self.nread+len(data))%self.storebuffsize
if end>start:
self.storebuff[start:end]=data
else:
self.storebuff[start:]=data[:self.storebuffsize-start]
self.storebuff[:end]=data[self.storebuffsize-start:]
self.nread+=len(data)
self.npassed=max(self.npassed,self.nread-self.storebuffsize)
if self._data_wait_size is not None and self.nread>=self._data_wait_size:
self._data_ready.set()
[docs]
def start_loop(self):
"""Start the read loop"""
self.stop_loop()
self._thread=threading.Thread(target=self.loop_read,daemon=True)
self._looping=True
self._read_evts=[lib.CreateEventA(None,True,False,None) for _ in range(2)]
self._executing.set()
self.nread=0
self.npassed=0
self._thread.start()
[docs]
def stop_loop(self):
"""Stop the read loop"""
if self._thread is not None:
self._looping=False
lib.SetEvent(self._read_evts[1])
self._executing.set()
self._thread.join()
self._thread=None
for evt in self._read_evts:
lib.CloseHandle(evt)
self._read_evts=None
[docs]
@contextlib.contextmanager
def pausing(self, do_pause=True, timeout=None):
if not do_pause:
yield
return
timeout=timeout if timeout is not None else -1
paused=not self._executing.is_set()
self._executing.clear()
try:
if not self._read_lock.acquire(timeout=timeout):
raise HIDTimeoutError("timeout while pausing the read loop")
try:
yield
finally:
self._read_lock.release()
finally:
if not paused:
self._executing.set()
def _wait_for_nread(self, nread, timeout=None):
if self.nread>=nread:
return
self._data_ready.clear()
self._data_wait_size=nread
try:
if not self._data_ready.wait(timeout=timeout):
raise HIDTimeoutError("timeout while reading")
finally:
self._data_wait_size=None
[docs]
def read(self, nbytes=None, timeout=None, peek=False):
"""
Read the given number of bytes from the read buffer.
If `nbytes` is ``None``, return all read bytes.
If `timeout` is not ``None``, it can define the read operation timeout; otherwise, use the default timeout specified on creation.
If ``peek==True``, return the bytes but keep them in the buffer.
"""
if nbytes is None:
nbytes=self.nread-self.npassed
else:
self._wait_for_nread(self.npassed+nbytes,timeout=timeout)
if not nbytes:
return b""
start=self.npassed%self.storebuffsize
end=(self.npassed+nbytes)%self.storebuffsize
if end>start:
result=bytes(self.storebuff[start:end])
else:
result=bytes(self.storebuff[end:]+self.storebuff[:start])
if not peek:
self.npassed+=nbytes
return result
[docs]
def get_pending(self):
"""Get the number of bytes in the read buffer"""
return self.nread-self.npassed
def _do_write(self, data, timeout=None):
evt=lib.CreateEventA(None,True,False,None)
overlapped=lib.make_overlapped(evt)
to=0xFFFFFFFF if timeout is None else max(0,int(timeout*1E3))
try:
complete=lib.WriteFile_async(self._file,data,len(data),None,ctypes.byref(overlapped))
if not complete:
res=lib.WaitForSingleObject(evt,to)
if res!=0:
lib.CancelIo(self._file)
raise HIDTimeoutError("timeout while writing")
chwrite=lib.GetOverlappedResult(self._file,overlapped,False)
return chwrite
finally:
lib.CloseHandle(evt)
[docs]
def get_pending(self):
"""Get the number of bytes in the read buffer"""
self._check_open()
return self._reader.get_pending()
[docs]
def read(self, nbytes=None, timeout=None):
"""
Read the given number of bytes from the read buffer.
If `nbytes` is ``None``, return all read bytes.
If `timeout` is not ``None``, it can define the read operation timeout; otherwise, use the default timeout specified on creation.
"""
self._check_open()
if timeout is None:
timeout=self.timeout
return self._reader.read(nbytes=nbytes,timeout=timeout)
[docs]
def write(self, data, timeout=None):
"""
Write the given data to the device.
If `timeout` is not ``None``, it can define the write operation timeout; otherwise, use the default timeout specified on creation.
"""
self._check_open()
if timeout is None:
timeout=self.timeout
if isinstance(data,py3.textstring):
data=py3.as_bytes(data)
if not isinstance(data,(bytes,bytearray)):
data=bytes(data)
data=self._build_msg(data,self._caps.OutputReportByteLength)
with self._reader.pausing(do_pause=self.pause_on_write,timeout=timeout):
nwrite=self._do_write(data,timeout=timeout)
return nwrite