Source code for pylablib.devices.Arduino.base

from ...core.devio import comm_backend

import time


[docs] class ArduinoError(comm_backend.DeviceError): """Generic Arduino devices error"""
[docs] class ArduinoBackendError(ArduinoError,comm_backend.DeviceBackendError): """Generic Arduino backend communication error"""
[docs] class IArduinoDevice(comm_backend.ICommBackendWrapper): """ Generic Arduino device. Args: port: serial port name rate: baud rate timeout: default communication timeout term_write: default write terminating character (automatically appended on every sent message) term_read: default read terminating character (used to determine when the incoming message is received completely) flush_before_op: if ``True`` (default), automatically flush input buffer on comm/query dtrrts: determines whether to use DTR/RTS signals for communication; generally, should be set to ``True`` on newer boards (e.g., Leonardo) and to ``False`` on older boards (e.g., Uno); settings ``dtrrts=True`` on older boards leads to the board reset upon connection, and settings ``dtrrts=False`` on newer boards leads to the communications getting frozen """ Error=ArduinoError def __init__(self, port, rate=9600, timeout=10., term_write="\n", term_read="\n", flush_before_op=True, dtrrts=True): self._get_new_instr=lambda: comm_backend.new_backend((port,rate),"serial",term_write=term_write,term_read=term_read,timeout=timeout,no_dtrrts=not dtrrts,reraise_error=ArduinoBackendError) self._flush_before_op=flush_before_op super().__init__(self._get_new_instr())
[docs] def reopen(self): """Close and reopen the device connection""" self.instr.close() time.sleep(0.2) self.instr=self._get_new_instr()
[docs] def reset_board(self): """Reset the board by pulsing the DTR and RTS lines""" self.instr.instr.setDTR(1) self.instr.instr.setRTS(1) self.instr.instr.setDTR(0) self.instr.instr.setRTS(0)
[docs] def comm(self, comm, timeout=None, flush=False, flush_delay=0.02): """ Send a device command. If `timeout` is not ``None``, it specifies a custom timeout for the operation. If ``flush==True``, then wait for `flush_delay` seconds after the write and read everything returned by the device. """ if self._flush_before_op: self.instr.flush_read() with self.instr.using_timeout(timeout): self.instr.write(comm) if flush: time.sleep(flush_delay) return self.instr.flush_read()
[docs] def query(self, query, timeout=None, query_delay=0, flush=False, flush_delay=0.02): """ Send a device query and return the reply. If `timeout` is not ``None``, it specifies a custom timeout for the reply read operation. If ``query_delay>0``, it specifies the delay between write and subsequent read attempt. If ``flush==True``, then wait for `flush_delay` seconds after the write and read everything returned by the device. """ if self._flush_before_op: self.instr.flush_read() self.instr.write(query) time.sleep(query_delay) resp=self.instr.readline(timeout=timeout) if flush: time.sleep(flush_delay) self.instr.flush_read() return resp.strip()