Source code for pylablib.devices.Conrad.base

from ...core.devio import comm_backend

import struct
import collections


[docs] class ConradError(comm_backend.DeviceError): """Generic Conrad devices error"""
[docs] class ConradBackendError(ConradError,comm_backend.DeviceBackendError): """Generic Conrad backend communication error"""
[docs] class RelayBoard(comm_backend.ICommBackendWrapper): """ Conrad relay board controller Args: conn: serial connection parameters (usually port or a tuple containing port and baudrate) start_addr: address which is assigned to the first board in the chain upon initialization; all following boards increase the address by 1 """ Error=ConradError def __init__(self, conn, start_addr=1): instr=comm_backend.new_backend(conn,"serial",term_write="",timeout=3.,defaults={"serial":("COM1",19200)},reraise_error=ConradBackendError) super().__init__(instr) self.start_addr=start_addr self._add_info_variable("start_addr",lambda: self.start_addr) self._add_info_variable("boards_number",lambda: self.boards_number) self._add_status_variable("relays",lambda: self.get_all_relays(0)) self.open()
[docs] def open(self): """Open the connection to the board""" res=super().open() self._initialize() return res
TMessage=collections.namedtuple("TMessage",["comm","addr","data"]) def _make_msg(self, comm, addr=1, data=0): check_sum=comm^addr^data return struct.pack("BBBB",comm,addr,data,check_sum) def _parse_msg(self, msg): return self.TMessage(*struct.unpack("BBBB",msg)[:3]) def _initialize(self): res=self.query(1,self.start_addr,multi_result=True) self.boards_number=len(res)-1
[docs] def query(self, comm, addr=1, data=0, multi_result=False): """ Send a query with the given command, address and data. If ``multi_result==False``, read a single reply frame; otherwise, keep reading until reply with the same command as sent is received (used in initialization and broadcast queries). """ msg=self._make_msg(comm,addr=addr,data=data) self.instr.write(msg) replies=[self._parse_msg(self.instr.read(4))] if multi_result: while replies[-1].comm!=comm: replies.append(self._parse_msg(self.instr.read(4))) return replies if multi_result else replies[0]
[docs] def get_all_relays(self, addr=1): """ Get all relay states. If `addr` is not 0, return dictionary ``{relay:value}``, where ``relay`` is the relay index on the board (between 1 and 8 inclusive). If ``addr==0`` (broadcast), return dictionary ``{addr:board_state}``, where ``board_state`` is in turn a state dictionary is described above. """ if addr==0: reply=self.query(2,0,multi_result=True) return {r.addr:{i+1:bool(r.data&(1<<i)) for i in range(8)} for r in reply[:-1]} else: reply=self.query(2,addr) return {i+1:bool(reply.data&(1<<i)) for i in range(8)}
[docs] def set_all_relays(self, values, addr=1): """ Set all relay states. `values` can be a list (listing relay states from lowest to highest), or a dictionary ``{relay:value}``, where relays are numbered from 1 to 8. Relays without values are kept unchanged. If ``addr==0``, broadcast to all boards """ if not isinstance(values,dict): values={i+1:v for i,v in enumerate(values)} mask=self.query(2,addr).data for p,v in values.items(): bm=1<<(p-1) if v: mask|=bm else: mask&=0xFF^bm self.query(3,addr,mask,multi_result=(addr==0)) return self.get_all_relays(addr=addr)
[docs] def get_relay(self, relay, addr=1): """Get the state at a given relay (indexed from 1 to 8 inclusive)""" return self.get_all_relays(addr=addr)[relay]
[docs] def set_relay(self, relay, enable=True, addr=1): """Get the state at a given relay (indexed from 1 to 8 inclusive)""" return self.set_all_relays({relay:enable},addr=addr)[relay]